How to prefetch data when processing with GPU?

@harpone Yes, various blob storages can work, it’s a common setup for Tensorflow training w/ Google ml-engine. Datasets can be stored in GCS or BigTable, there are C++ ops wrapped in a Python API that allow that to be tied together with the TFRecord format and their data API. Edit: And thanks for sharing the azure blob dataset, curious how large each file for the scenario you’re targeting?

Cloud blob storage has its own IOPS equivalent (roughly, for this task) limitation, requests/sec. Even though the throughput on cloud to cloud network transfers are really high, requesting data over a network will have a much higher latency (bounded by a multiple of the RTT) than local storage. You have to design the system carefully to mitigate latency by increasing the size of, and reducing number of sequential requests, or by dispatching many async parallel requests. The easiest solution for those large blocks is to use a record format. For many parallel requests, i’d probably use an efficient/scalable distributed database than can store your data natively (ie binary as binary or text/JSON as text/JSON).

I wouldn’t want to write the requesting and parsing code for any of the above in Python though. You don’t actually need many CPUs if you aren’t stuck in Python, you want those CPUs to be doing more useful things. With a small pool of worker threads and a decent async (or at least non-blocking) IO subsystem, you can move and parse a lot of data efficiently using a systems language. This is hard to do effectively in Python.

This ‘Python for the interface and C/C++ for the dirty work’ paradigm is a little saddening when you realize you need to do some of the dirty work. I’m quite looking forward to see how the Tensorflow Swift experiment works out. If Swift gains momentum as both a viable server language and ML language, it could make some of the systems work for supporting ML much less of a chore. SwiftTorch FTW? :slight_smile:

2 Likes

If I want to quickly achieve a bit more scale for a training project in the cloud (without writing extra code) I usually just use GCE and the direct attached SSD. Unlike AWS, in GCE you can attach a single persistent SSD to multiple instances as long as you mount read-only on all.

I use scripts to move data from long term storage to a new persistent SSD, then attach read-only to multiple training instances at the same mount point. Delete the SSD when done that experiment.

2 Likes

I’ve been hoping to see someone mention “SwiftTorch” for a while. This would be a dream come true. I fell in love with the language while spending some time last year working on iOS based projects. If I get some free time in the next few months, I would like to play around with making a proof-of-concept front end for some of the torch functionality. There may be an easy way to do it by leveraging Objective C/C++ as a bridge. My plan is to wrap the PyTorch C++ API in objective C++ wrapper classes, which can be exposed directly to Swift.

I forgot to mention quite crucially that the idea was to generalize a “FolderDataset” to blob storage case, i.e. the data is just a bunch of JPEGs, exactly because we would prefer to keep the data as close to original as possible for debugging and other reasons…

It would indeed be nice to have a one language solution for all… you mentioned Swift, but how about Julia?

Thanks for the advice again @rwightman! GCE’s persistend data-SSD would sound really useful!

What about moving your dataset to dev/shm on Linux filesystems, could this help? Or is it detrimental?

I’m curious if you’ve taken a look at DALI?

I’m spending some time going through the code now to try to get a sense of what’s going on. It’s computer vision specific, but in another thread about speeding up the dataloader there were claims of an 8x speedup, although how much of that is coming from doing the image preprocessing on GPU I’m not sure.

I’m working with tabular data which it doesn’t support, but I’m trying to figure out some of their caching strategies and techniques so I can work it into the dataloader I’m working on to go along with RAPIDS.AI. (I work for NVidia on the Rapids team focused on deep learning for tabular data)

1 Like

Hi,
DALI partially can solve your problems. The basic idea behind DALI was to move data processing to the GPU as CPU is not scaling that well as GPU, and the processing power ratio of GPU/CPU is increasing over time.

It provides :

  • GPU acceleration to image processing. So if the CPU is the bottleneck it will help
  • transparent way to prefetch data - you can select how many batches ahead you want to have stored in the pre-processing queue. It matters a lot when the processing time varies a lot batch 2 batch
  • an easy way to jump between different data storage formats without any need to write custom code for that. Currently, you can select between LMDB, to record, RecordIO, singe files and COCO.
  • moves data to the GPU in the background
  • does shuffling even for the sequential data formats - DALI keeps internal buffer that sequentially read data and then randomly samples it to format a batch

However, DALI will not:

  • help much if you are not CPU bound - your network is heavy and GPU is already occupied or you have big CPU/GPU ratio. Still, you can use DALI with CPU based operators for the flexibility and then when your network is lightweight just change the operator flavour form CPU to GPU
  • if you are IO bound then DALI won’t accelerate your disk access
4 Likes

@Even_Oldridge @JanuszL I’ve aware of DALI and familiar with it at a high level but I have not had a chance to try it. I’ve generally been able to find a good balance of CPU vs GPU utilization on image based projects without. I was going to take a look at the data loading pipeline a bit closer at some point. Definitely some goodies in there.

My philosophy is that if you can do the data preprocessing on the CPU without bumping into the limits, it’s best to do so and keep the GPU 100% busy learning. When the CPUs can no longer keep up, it seems like a good option to look at.

Where I see DALI providing a lot of value for me (in the future) is with video. H.264/H.265 isn’t feasible to decode on CPUs at a rate needed for DL. Especially if you’re trying to move around in larger segments with sporadic IDR frames. Decoding on the GPU into on card memory that can be fed directly into the NN is key there. I just haven’t hit that point yet :slight_smile:

Since we’ve got an author of DALI here, I’m curious, how much memory overhead is it to preprocess the images on the GPU taking into consideration the original image size to NN input size ratio? Is there a rough rule/formula? Naively, if one were to try orchestrate this from most frameworks in Python, you’d have to copy into GPU tensors at the original images’ full size before doing any downscale + augmentation. Thus using up a lot of valuable GPU memory. How efficient is DALI at this when it comes do doing the JPEG decode -> preprocess -> network input size?

5 Likes

@rwightman I totally agree that moving things to GPU should be done only when your CPU is overworked. It all depends on the dataset you have (the bigger images the more data processing, decoding especially), what DL framework you are using and how computational intensive is your network. I.e. for RN50 we can easily saturate GPU while for RN18 it will rather starve for the data.

Regarding the video support we are still at the very initial stage, we have a decoder and just basic operators to show how it works and what kind of use cases people has for that. We really count on some external contribution to help us extend support for such workloads.

Regarding memory, there is a lot of factors. Mostly it depends on how your pipeline looks like. DALI is not executing in place and is processing data in batches. So we need an intermediate buffer between every operator. So for every operator n we need Xn*Yn*N (N-number of sample in the batch). Some operators like decoder can have big images at the output and each is different, while some like crop will have their output with a fixed size. Also, DALI provides multiple buffering so we need to add space for the output buffers as well. So as the final formula I would count the worst case sizes of images in the batch at each operator output + prefetch queue depth number of output buffers. Also, some operators, like resize need some scratch buffer on their own. We are aware that DALI memory consumption is far from perfect and we want to improve it, but not sacrificing the performance. That is why this will be part of more significant architecture rework.
As a side note I can add that to improve a bit memory consumption (at least for the decoder) DALI provides (thanks to nvJPEG) ROI based decoding so you can save a bit of memory not decoding the whole image but just the part of it which will be processed later (as cropping is usually part of the processing pipeline).

Correct me if I’m wrong.

With num_workers > 1 , dataloader create num_workers threads for data loading. Each thread maintains a pool of batch_size images. Each time, dataloader just fetch images from a pool which is fully loaded. This causes several problems when batch_size is very large (1024,2048…).

  1. The memory usage is large. batch_size x num_workers x img_height x img_width x img_channels x 4(float). It can easily be 11GB or more : 1024202242243*4
  2. A obvious delay occurs every num_workers batch. Just like Weifeng shows.

Maybe we make all threads share a pool (a queue) of n*batch_size and all num_workers threads loads data into this pool. Each time the dataloader just pull batch_size images from this pool. n can be reasonably small (< num_workers) to reduce memory usage.

BTW, mutilprocessing distributed training is much faster than DataParallel in large batch_size case, even on a single node with multiple gpus. The reason might be that each process just need to load batch_size / gpus_per_node images.

1 Like

There is a distinction that makes all the difference here. Thanks to the limitations of Python, it’s not threads we’re dealing with but processes. Each contiguous batch tensor is assembled by each worker using the collate fn in a separate process when num_workers > 0. Python multiprocessing Queues (with some Tensor handling additions) are used to get those batch tensors to the main process via shared memory.

If it was threads, and we were in a language like C++ the memory would be in the same process. With lightweight thread sync primitives, you could have enough fine grained control to interleave loading of samples into batches without needing to have as many batch tensors in flight. That doesn’t really work well/at all with multiple processes and the limited control you have in Python.

2 Likes

@suruoxi - it doesn’t really work like that. As I said when DALI is used one DALI instance is created per GPU. So you have prefetch_depth x num_of_GPU x per_GPU_batch_size x image_size amount of memory consumed per node. Regarding batch sizes, you mentioned as an example: 1024, 2048… it is rather a global node batch size, not per GPU (or am I’m wrong). If you train networks using over 1k samples in a batch per GPU could you provide some examples, for now, we are usually using batches of size ~256 on a single node (per GPU).

For multi-node training, the node batch size is definitely even smaller.
Nevertheless, we are aware that this may be not acceptable in some configurations and we have an idea to make DALI process smaller sub batches and assemble them into one at the end of the pipeline, so intermediate buffers are smaller. We haven’t get into this yet and I cannot provide any timeline for this to be implemented. If anyone wants to contribute to project doing that is more than welcome.

@JanuszL @rwightman What are the rules of thumb to choose an optimal number of workers? For example, I have 4 Titan Pascal GPUs and i7-5930K (6 physical/12 logical cores) CPU. How many workers should I use for training ResNet-18 (batch size 256)? What about ResNet-50? How does this change if I switch from default Pytorch DataParallel to DALI?

@michaelklachko - I would say the more is the better as long as you are not starving your PyTorch from getting CPU time. In your case, you have 3 CPU logical cores per GPU (if I understand your setup correctly). Then if you set more than 3 workers they would just fight for CPU time, I would suggest to try 2 and keep 1 free for the PyTorch itself. You can check how 3 and 1 would work.

1 Like

I usually use 4-6 workers per GPU with a DistributedDataParallel setup (each GPU has its own process with its own worker processes). I try to keep the total cumulative number of worker + training processes <= the logical core count. You’ll also want to watch the memory usage when upping the workers, if you start exhausting your physical memory, you may want to try pulling back the number of workers and see if that brings you below exhaustion threshold.

I’m usually CPU bound for my training, I’ve upgraded the drives I train from to NVME or SSD (for big datasets) and am usually working with ‘larger’ images, so IOPs and disk throughput not in the way.

With ResNet-50 + 4 GPU you might find a decent balance if your augmentation isn’t too extensive. With the ResNet-18 it might be a challenge. DALI may be of use as you’ll likely have a number of workers per GPU around 2 (due to logical core count of 12), which often limits GPU utilization. Offloading more to GPU or just being able to do the loading + processing in C++ will give you some headroom. Within Python, using an OpenCV image loading/decoding/preprocesing pipeline can reduce some bottlenecks as it’s quite a bit faster than PIL. PIL SIMD can help, but still not as fast as OpenCV

Thank you.

I just tested number of workers for DataParallel: Total shows the total wall time from launching the code to finish (in minutes). Data/compute/other/backprop is average time per execution stage (“other” is loss + accuracy calculations, backprop is backward() + step()), seconds per iteration, ignoring the first iteration. I ran this for 200 iterations. See code below for details. Again, this is ResNet-18 training on 4 Titan Pascal GPUs, and 6 core CPU (with hyperthreading, so 12 logical cores).

Workers   | Total |   data/compute/other/backprop
 0           5.4         1.50/0.02/0.03/0.01
 1           5.4         1.51/0.05/0.02/0.03
 2           3.0         0.78/0.05/0.03/0.02
 4           1.7         0.38/0.05/0.04/0.02
 6           1.4         0.27/0.05/0.05/0.02
 8           1.2         0.22/0.05/0.05/0.03
10           1.1         0.19/0.07/0.04/0.03
12           1.1         0.17/0.07/0.03/0.03
14           1.1         0.16/0.08/0.03/0.03
16           1.1         0.15/0.08/0.03/0.03
20           1.1         0.12/0.09/0.03/0.03
24           1.1         0.11/0.11/0.03/0.04
28           1.1         0.09/0.12/0.02/0.04
32           1.1         0.04/0.16/0.03/0.05
40           1.2         0.01/0.20/0.02/0.07
48           1.3         0.00/0.20/0.02/0.06
64           1.5         0.01/0.23/0.02/0.06
96           1.8         0.07/0.23/0.02/0.07

128 workers crashed the process (probably because of OOM, the server has 64GB of RAM, and 96 workers consume over 40GB).

I’m not sure how to interpret these results:
Why compute time changes?
Why loss + accuracy calculations take so much time?
Why backprop time is so short?
Is this method of measuring time valid for parallel execution on multiple GPUs?

def accuracy(output, target):
	with torch.no_grad():
		batch_size = target.size(0)
		pred = output.data.max(1)[1]
		acc = pred.eq(target.data).sum().item() * 100.0 / 256
		return acc

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
train_dataset = datasets.ImageFolder(traindir, transforms.Compose([
		transforms.RandomResizedCrop(224),
		transforms.RandomHorizontalFlip(),
		transforms.ToTensor(),
		normalize,
	]))

for workers in [0, 1, 2, 4, 6, 8, 10, 12, 14, 16, 20, 24, 28, 32, 40, 48, 64, 96, 128]:
	model = ResNet(BasicBlock)
	model = torch.nn.DataParallel(model).cuda()
	criterion = nn.CrossEntropyLoss().cuda()
	optimizer = torch.optim.SGD(model.parameters(), 0.001)

	train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True,  num_workers=workers, pin_memory=True)

	model.train()
	data_times = []
	compute_times = []
	other_times = []
	backprop_times = []

	start = time.time()
	end = start
	for i, (images, target) in enumerate(train_loader):
		target = target.cuda(non_blocking=True)
		end_data = time.time()
		data_time = end_data - end
		output = model(images)
		end_compute = time.time()
		compute_time = end_compute - end_data
		loss = criterion(output, target)
		acc = accuracy(output, target)
		end_other = time.time()
		other_time = end_other - end_compute
		optimizer.zero_grad()
		loss.backward()
		optimizer.step()
		end_backprop = time.time()
		backprop_time = end_backprop - end_other

		if i != 0:
			data_times.append(data_time)
			compute_times.append(compute_time)
			other_times.append(other_time)
			backprop_times.append(backprop_time)

		end = time.time()

		if i == 200:
			ttime = end - start
			data_time = np.mean(data_times)
			compute_time = np.mean(compute_times)
			other_time = np.mean(other_times)
			backprop_time = np.mean(backprop_times)
			print('Workers {:d}  {:.1f}  {:.2f}/{:.2f}/{:.2f}/{:.2f}'.format(workers, ttime/60., data_time, compute_time, other_time, backprop_time))
			break
1 Like

Here’s the same experiment, but with pin_memory=False:

 0              5.4             1.49/0.03/0.03/0.01
 1              5.2             1.48/0.04/0.02/0.02
 2              2.6             0.70/0.04/0.03/0.02
 4              1.4             0.31/0.04/0.04/0.01
 6              1.1             0.21/0.05/0.03/0.02
 8              0.9             0.17/0.05/0.03/0.02
10              0.9             0.14/0.07/0.02/0.02
12              0.8             0.10/0.09/0.02/0.02
14              0.8             0.08/0.10/0.02/0.02
16              0.8             0.08/0.11/0.02/0.03
20              0.9             0.07/0.11/0.02/0.03
24              0.9             0.05/0.14/0.02/0.03
28              1.0             0.05/0.14/0.02/0.03
32              1.0             0.06/0.15/0.02/0.03
40              1.1             0.06/0.15/0.02/0.03
48              1.1             0.06/0.16/0.02/0.03
64              1.3             0.06/0.18/0.02/0.03
96              1.5             0.07/0.20/0.02/0.03

You need to synchronize (torch.cuda.synchronize()) before measuring time. Otherwise, due to the async nature of CUDA, the measured time is not accurate (see https://pytorch.org/docs/stable/notes/cuda.html#asynchronous-execution) .

1 Like

Thank you @SimonW, I modified the code as following:

CUDA_LAUNCH_BLOCKING = 1

for workers in [0, 1, 2, 4, 6, 8, 10, 12, 14, 16, 20, 24, 28, 32, 40, 48, 64, 96]:
	model = ResNet(BasicBlock)
	model = torch.nn.DataParallel(model).cuda()
	criterion = nn.CrossEntropyLoss().cuda()
	optimizer = torch.optim.SGD(model.parameters(), 0.001)

	train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True,  num_workers=workers, pin_memory=False)

	model.train()
	data_times = []
	compute_times = []
	other_times = []
	backprop_times = []
	total_times = []
	torch.cuda.synchronize()
	start = time.time()
	end = start
	for i, (images, target) in enumerate(train_loader):
		target = target.cuda(non_blocking=True)
		torch.cuda.synchronize()
		end_data = time.time()
		data_time = end_data - end
		output = model(images)
		torch.cuda.synchronize()
		end_compute = time.time()
		compute_time = end_compute - end_data
		loss = criterion(output, target)
		acc = accuracy(output, target)
		torch.cuda.synchronize()
		end_other = time.time()
		other_time = end_other - end_compute
		optimizer.zero_grad()
		loss.backward()
		optimizer.step()
		torch.cuda.synchronize()
		end_backprop = time.time()
		backprop_time = end_backprop - end_other
		total_time = data_time + compute_time + other_time + backprop_time

		if i != 0:
			data_times.append(data_time)
			compute_times.append(compute_time)
			other_times.append(other_time)
			backprop_times.append(backprop_time)
			total_times.append(total_time)

		torch.cuda.synchronize()
		end = time.time()

And here are the results:

 0              5.5             1.45/0.06/0.00/0.07
 1              5.1             1.39/0.06/0.00/0.07
 2              2.6             0.64/0.07/0.00/0.08
 4              1.4             0.26/0.07/0.00/0.08
 6              1.1             0.16/0.07/0.00/0.08
 8              1.0             0.13/0.07/0.00/0.08
10              0.9             0.10/0.08/0.00/0.08
12              0.8             0.05/0.11/0.00/0.08
14              0.9             0.04/0.11/0.00/0.08
16              0.9             0.04/0.12/0.00/0.08
20              0.9             0.04/0.12/0.00/0.08
24              1.0             0.04/0.13/0.00/0.08
28              1.0             0.04/0.13/0.00/0.08
32              1.0             0.05/0.13/0.00/0.08
40              1.1             0.05/0.14/0.00/0.08
48              1.2             0.05/0.14/0.00/0.08
64              1.3             0.06/0.15/0.00/0.08
96              1.6             0.07/0.17/0.00/0.08

The “other” time is now negligible, which is expected, however it’s still not clear why the compute time is increasing with number of workers, while the backprop time stays roughly constant.

1 Like

There is a way to prefetch data between cpu and gpu by cudaMemAdvise and cudaMemPrefetchAsync. I am wondering that is this has been intergrated in to dataloader. I found a flag prefetch_factor in dataloader constructor, not sure if it is the one. If not, how can I integrated it?