Communicating with Dataloader workers

Hey,

I am having some issues with how the dataloader works when multiple workers are used.

In my dataset, I resize the images to the input dimensions of the network.
I am training a fully convolutional network and I can thus change the input dimension of my network in order to make it more robust whilst training.
The way I have been doing this variable resizing is by passing a reference of my network variable to my resize transformer object. This way, the resize object can check network.input_dimension and resize the data accordingly!

Now, what I had failed to understand correctly when I coded it like this, is that when you are using multiple workers for your data loading, it spawns new processes. This means that your dataset, and thus also your transformer objects, get copied to these new processes. Because my resize transformer has a reference to my network object, this network object will also be copied to every worker-process. This eventually means that when I change the input dimension in my main process, nothing changes in the worker processes (their reference to network didn’t change input dimensions).

Now I want to only have a width and height variable in my resize object, because copying the entire network was stupid and useless. But I don’t know how I can communicate with my worker processes to tell them the input dimensions have changed!

Any help is appreciated!

EDIT 1:
This is the code for spawning the workers.

self.workers = [
                multiprocessing.Process(
                    target=_worker_loop,
                    args=(self.dataset, self.index_queue, self.data_queue, self.collate_fn))
                for _ in range(self.num_workers)]

So if I add self.input_dim = multiprocessing.Array('i', (width, height)) to my dataset, that variable will be shared?

EDIT 2:
Hmm, tried adding the multiprocessing.Array, with no luck so far… :sweat:

1 Like

mp.Array should work in theory… What behavior are you seeing?

I just realized I might have made a silly mistake…
I created the mp.array but when changing the dimensions, I just assigned a new (width, height) tuple.
I think I have to assign it like var[0] = w; var[1] = h in stead!

I will try this out on Monday! :slight_smile:

What I’m still wondering though, is whether mp.array works when it is a property of an object… We’ll see!

Yeah, you definitely need to assign data as elements of the array. It should work as attribute of an object.

@SimonW, thank you for your help, it works as expected!! :smile:

1 Like

So, communicating with workers through mp.Array works fine, but now I am having synchronization issues.

Because the DataLoaderIter puts the next indices in the Queue when fetching the current batch, this means some of the images are already loaded when I change the input dimensions. This means I cannot collate the images in a batch, because some of them have a different size.

I also noticed PyTorch is changing the DataLoader in master to have a separate manager thread. Maybe there could be a way to reset the last batch? Or at least a synchronized way to communicate with the workers.
You could pass a function to the DataLoader and have the DataLoaderIter call it before sending the indices for the next batch.

I am not sure whether my problem is general enough to add this change to the PyTorch code or if I have to find another workaround for my problem? Any help would be really appreciated to fix this issue!!

Unfortunately, the new manager_thread doesn’t help in this use case (I wrote that). But the other way you can solve this is to write the resize logic in a custom collate function. Instead of having the dataset getitem method checking the mp.Array value, let your collate_fn do it and resize, then collate in a batch tensor :). Since it operates on a whole batch, it should work fine.

Writing this for future people stumbling upon this issue.

Whilst the solution proposed by @SimonW would certainly work, I decided to solve my problem by using a custom BatchSampler, as suggested by @apaszke on github.
The reason being that this solution allows me to use any collate function, sampler, etc. that I want at run-time.

My CustomBatchSampler inherits the default PyTorch BatchSampler and overwrites the __iter__ method.
In that method, I call:

for batch in super(CustomBatchampler, self).__iter__():
  yield [(self.input_dim, idx) for idx in batch]
  # compute new input dimension
  ...

This works, but is not super user-friendly as you would have to remember to create your own CustomBatchSampler object and use that in the DataLoader.
So I decided to copy the PyTorch DataLoader class, and modify it to use my CustomBatchSampler. This means that in my code, I just use this custom DataLoader and can use any sampler, or use the shuffle, batch_size, ... variables to automatically select a sampler.

Thank you all for helping me solve this issue!

2 Likes

I looked everywhere and worked on this problem for a few days before solving it. Basically, data structures instantiated using the multiprocessing package do not behave reliably. For example, things break predictably when you nest shared memory structures within other nested shared memory structures. The ideal way to have asynchronous communication between PyTorch dataloader workers is to use process Queues, which shuttle active child process state information to the next active worker which then in turn shuttles new information to the next. Queues are certainly not elegant but can be made far less prone to breaking parallel processes, as indicated by the torch dev team.

Unfortunately, this was not an option for me given how my code base already stored this kind of information, but if you can modify your code to accept information that is popped from Queue, then you certainly should. My problem was actually quite similar to the one described in the later parts of this post, but I ultimately just wanted to share my solution with the community because of all the trouble reading/writing to shared memory from PyTorch dataloaders caused me (I have cross referenced this to other posts that were similar as well). This is specific to my implementation so you may run into instances where this is still unstable/causes deadlocks. Also keep in mind that I am only including relevant steps for brevity. This was tested in a code base using DP and NOT DDP.

In my application, I needed to use a dynamic probability vector for each parent sample that was being loaded by a worker through its respective sampled PyTorch dataloader ID to retrieve instances that were linked to the batched parent sample (this is for MIL). The issue I was facing was that each worker would use its own probability distribution that it modified within its own state, causing significant overfitting on even larger datasets due to overlapping instance sampling. This also cannot be remedied easily using PyTorch’s default/custom sampler or batch_sampler (although samplers run in the main process, they will not natively sync worker states, making it very difficult to use them to sub-sample data like this), so it unfortunately required me to use shared memory structures, which should generally be AVOIDED as I mentioned previously.

Keep in mind that you have to use shared memory tensors in my solution as opposed to other shared memory data structures, as I have found that they result in more predictable child process behavior (i.e. you will have to convert your structs if they are lists, np arrays, etc.). You can nest these shared memory tensors within nested local memory structures; this did not cause any issues in my experiments. You also have to use Reentrant locks (i.e. torch.multiprocessing.Manager().RLock()), because you will induce deadlocks if you either use Mutex locks (i.e. torch.multiprocessing.Manager().Lock(), which wont allow workers to reenter into a lock on a resource, therefore deadlocking) or no locks at all.

The four key steps I used are listed here:

Step 1:

# use multiprocessing manager, import within your custom PyTorch Dataset file 
# DONT import the lock directly as it can be buggy
from torch.multiprocessing import Manager

Step 2:

# set the main reentrant lock in _ _init_ _
self.main_lock = Manager().RLock()

Step 3:

# use the share_memory_() method only on the tensor you want to share in _ _init_ _ 
# or pass it from _ _main_ _ and store it as a class variable in _ _init_ _
# AVOID USING NESTED SHARED MEMORY STRUCTURES CONTAINING OTHER SHARED MEMORY STRUCTURES
self.input_2d_probs_dicts[j][key] = torch.tensor(
    self.input_2d_probs_dicts[j][key], dtype=torch.float32).share_memory_()

Step 4:

# perform your operations as efficiently as possible using the lock in _ _getitem_ _
# if your processes are not efficient, you will either destroy the benefits of multiple workers or induce deadlocks
# take care to do this in one concerted step if possible to avoid mismatching between workers and to reduce the number of times workers get locked out
with self.main_lock:
    # quickly store the shared tensor values locally, do this instead of accessing the shared memory struct repeatedly!
    compartment_list[1][:] = self.input_2d_probs_dicts[comp_source][case_id][:]
    # get the nonzeros for later
    non_zero = torch.count_nonzero(compartment_list[1]).item()
    # check to see if we dont have enough nonzero weights
    if non_zero < self.STACK_2D[i]:
        # quickly reset the shared mem tensor, then store data locally
        self.input_2d_probs_dicts[comp_source][case_id][:] = self.input_2d_probs_dicts_uniform[comp_source][case_id][:]
        compartment_list[1][:] = self.input_2d_probs_dicts[comp_source][case_id][:]
    # sample the cases, done using torch.multinomial which is not ideal given our compute constraints
    stack_id[i], compartment_id[i] = self._stack_sampling(compartment_list, i)
    # quickly suppress the target probabilities in the shared tensor
    self.input_2d_probs_dicts[comp_source][case_id][compartment_id[i]] = 0
    # this code assumes you always have more instances than you are sampling for, which is why we dont reset sampling probabilities before releasing the lock

This worked reliably for me, allowing me to read and write to the shared data structure, without deadlocks, directly from the dataloader worker processes. All of my tests showed that the workers were using the same tensor data, without irregularities. Keep in mind these probability vectors are never seen by the GPUs. You will need to set torch.multiprocessing.set_start_method('spawn') only once at the beginning of your _ _ main _ _ file if you are using this sort of multiprocessing to interact with your GPUs in DDP (I believe spawn is the default for windows, this was done in linux which defaults to fork, you may also encounter issues with shared memory in DDP). In my use case, all of the shared memory structures were used within each worker instance exclusively.

@ptrblck I have seen you reply to MANY posts regarding this problem. I think this is the best solution if you are forced to read and write to shared memory in a PyTorch dataloader child process without using a Queue, and it seems to work much more reliably than using torch.multiprocessing.Array(), torch.multiprocessing.Value(), torch.multiprocessing.dict() and torch.multiprocessing.list(), with or without locks, as I have tried both for all of them. It is not immediately obvious to me why that may be, so perhaps this is something that should be directed to the PyTorch dev team. This may help others in the future as well. I’m just glad I got it to work well, I really did not want to reinvent the wheel in my code base! :sweat_smile:

Excuse all the edits, wanted to make sure it was a very clear and high effort post! :smile:

3 Likes