Communicating with Dataloader workers


I am having some issues with how the dataloader works when multiple workers are used.

In my dataset, I resize the images to the input dimensions of the network.
I am training a fully convolutional network and I can thus change the input dimension of my network in order to make it more robust whilst training.
The way I have been doing this variable resizing is by passing a reference of my network variable to my resize transformer object. This way, the resize object can check network.input_dimension and resize the data accordingly!

Now, what I had failed to understand correctly when I coded it like this, is that when you are using multiple workers for your data loading, it spawns new processes. This means that your dataset, and thus also your transformer objects, get copied to these new processes. Because my resize transformer has a reference to my network object, this network object will also be copied to every worker-process. This eventually means that when I change the input dimension in my main process, nothing changes in the worker processes (their reference to network didn’t change input dimensions).

Now I want to only have a width and height variable in my resize object, because copying the entire network was stupid and useless. But I don’t know how I can communicate with my worker processes to tell them the input dimensions have changed!

Any help is appreciated!

This is the code for spawning the workers.

self.workers = [
                    args=(self.dataset, self.index_queue, self.data_queue, self.collate_fn))
                for _ in range(self.num_workers)]

So if I add self.input_dim = multiprocessing.Array('i', (width, height)) to my dataset, that variable will be shared?

Hmm, tried adding the multiprocessing.Array, with no luck so far… :sweat:

mp.Array should work in theory… What behavior are you seeing?

I just realized I might have made a silly mistake…
I created the mp.array but when changing the dimensions, I just assigned a new (width, height) tuple.
I think I have to assign it like var[0] = w; var[1] = h in stead!

I will try this out on Monday! :slight_smile:

What I’m still wondering though, is whether mp.array works when it is a property of an object… We’ll see!

Yeah, you definitely need to assign data as elements of the array. It should work as attribute of an object.

@SimonW, thank you for your help, it works as expected!! :smile:

1 Like

So, communicating with workers through mp.Array works fine, but now I am having synchronization issues.

Because the DataLoaderIter puts the next indices in the Queue when fetching the current batch, this means some of the images are already loaded when I change the input dimensions. This means I cannot collate the images in a batch, because some of them have a different size.

I also noticed PyTorch is changing the DataLoader in master to have a separate manager thread. Maybe there could be a way to reset the last batch? Or at least a synchronized way to communicate with the workers.
You could pass a function to the DataLoader and have the DataLoaderIter call it before sending the indices for the next batch.

I am not sure whether my problem is general enough to add this change to the PyTorch code or if I have to find another workaround for my problem? Any help would be really appreciated to fix this issue!!

Unfortunately, the new manager_thread doesn’t help in this use case (I wrote that). But the other way you can solve this is to write the resize logic in a custom collate function. Instead of having the dataset getitem method checking the mp.Array value, let your collate_fn do it and resize, then collate in a batch tensor :). Since it operates on a whole batch, it should work fine.

Writing this for future people stumbling upon this issue.

Whilst the solution proposed by @SimonW would certainly work, I decided to solve my problem by using a custom BatchSampler, as suggested by @apaszke on github.
The reason being that this solution allows me to use any collate function, sampler, etc. that I want at run-time.

My CustomBatchSampler inherits the default PyTorch BatchSampler and overwrites the __iter__ method.
In that method, I call:

for batch in super(CustomBatchampler, self).__iter__():
  yield [(self.input_dim, idx) for idx in batch]
  # compute new input dimension

This works, but is not super user-friendly as you would have to remember to create your own CustomBatchSampler object and use that in the DataLoader.
So I decided to copy the PyTorch DataLoader class, and modify it to use my CustomBatchSampler. This means that in my code, I just use this custom DataLoader and can use any sampler, or use the shuffle, batch_size, ... variables to automatically select a sampler.

Thank you all for helping me solve this issue!