Gathering dictionaries of DistributedDataParallel

I am training a model to segment 3D images in a slice by slice-fashion. To distribute over multiple GPUs I am using DistributedDataParallel and I use DistributedSampler to split the dataset across the GPUs.

During prediction of new cases, I use a similar Dataset and DataLoader setup and I basically can gather a dictionary like: {'filename': [(slice_no_1, slice_1_pred), (slice_no_2, slice_2_pred)], ...} which I can subsequently sort on the first index to get an output. However, when I use DistributedSampler the slices are distributed along two GPUs, and I therefore end up with two dictionaries which most likely are both incomplete (one containing slices of the other and vice versa).

How do I gather these two dictionaries? As I preferably cast the predictions to a numpy array, it might be most convenient to gather these in the CPU memory.

1 Like

Hi @jteuwen,

I’m not sure I understand your issue, but I’ll give it a shot. You’re doing validation of your model and you’re using a distributed sampler on the validation set. This means you have a partial result on each process, and you’re looking to combine them into a single result, for the final accuracy numbers?

Or… perhaps the sampler splits the array of slices for a single filename across GPUs and you want to keep the predictions for a single filename on a single process?

Hi @pietern

No, training and validation is done on a slice-by-slice basis while the data are 3D MRI images. My Dataset outputs a dictionary with the data including a key which says to which file the slice belongs, and what the index of the slice is. I use the same setup for predicting new cases. However, for that I would like to recombine data again into a 3D volume. When you have one GPU that is fine: when processing a complete dataset you can combine based on the dictionary key denoting the filename and the slice number.

When doing this with a DistributedSampler, you have multiple python processing having a part of the dataset. In this case, even when the sampling is sequential, it can be that part of the slices end up in one process, and the others in another process. To recombine them I would need to have access in the process with rank 0 to all dictionaries of the other processes containing the slices.

Solutions I have come up with now:

  • Dump each slice to disk and when done, combine them in process rank 0
  • Use a memorymap to do the same thing (but can do with pickled dictionaries)
  • Use something such as Redis to store the results in. Extra bonus is that it would be easier to distribute as we already use Redis.

However, that seems to be quite convoluted for a reasonably simple problem. I could change the Dataset classes and the sampler specifically for this purpose, but that has the disadvantage that (1) if I change something in the dataset / dataloader I would need to change in two places, and a source for bugs (2) also tricky to implement a multiGPU model which scales well across a cluster.

I understand, thanks for the clarification.

You can use existing torch.distributed primitives gather or all_gather to get all results to a single or all processes, respectively. You say you’re outputting dictionaries, so you can’t do it with functions in core yet, and would need to serialize the dictionaries yourself. Coincidentally, @mrshenli and I were talking about adding this to core yesterday, and he created an issue to track it: https://github.com/pytorch/pytorch/issues/23232. This doesn’t solve your issue right now though.

To solve this today, I would indeed write everything to a shared filesystem if you have one (with torch.save), probably named after the rank of the process, run a barrier to ensure all writes are done, and then torch.load all of them on the process where you want to collect the results.

3 Likes

Thanks for your reply - that does seem like a good addition to the code base. By the way: if I would use the torch.distributed primitives, then since nccl is the backend, wouldn’t that transfer through the GPU memory (since nccl does not support CPU ipc)? That might be unconvenient, also for my use case.

That’s correct. It would require serialization on the CPU side, copy to GPU memory to perform the collective, copy back to CPU memory, and then deserialization. Alternatively, you can create a separate Gloo group and use that, with torch.distributed.new_group.

1 Like

@jteuwen I have a question for you. Are you loading a single 3D file that then gets sliced and passed into training a model? If this is the case one question I have is that when doing multiGPU training via DDP, could you not run into a situation where multiple processes would get different slices that originate from the same 3D file and would therefore want to access the file at the same time?

@solarflarefx Apologies that I missed your question.

It might be possible that in this way multiple processes try to access the same volume. What I do is make a BatchSampler, which takes this into account and tries to split the volumes over the different processes instead (using the rank).

Hi Jteuwen,

I am wondering if you could resolve your issue. I have a similar problem you have described here. In my experience, different parts of my output are handled by different GPUs, and each GPU returns back a dictionary. I expected to get all the results from GPUs so, I could recombine them using the keys of dictionaries. However, the GPUs return incomplete results likely because some GPUs (processes) have finished their job sooner than others. I have used dist.all_gather_object(), but still I cannot resolve my problem.

You might want to add a barrier in case you need to synchronize all processes.