How to use DataLoader for ReplayBuffer

In RL, the data is not static but keeps growing due to new samples explored by the agent.

I would like to use DataLoader for preparing/loading data from a replay buffer more efficiently. However, it seems that the concept of DataLoader is not well designed for non-stationary data.

So, what would be the best way to extract/load/transform data from a large replay buffer efficiently?

Thanks.

3 Likes

Bringing this thread up.

Did you come up with a good solution? I am running into the same problem… The dataloader is usable if you set the number of workers to 0, but that defeats the actual usefulness of a dataloader.

At the moment I am just sampling from large tensors, so I do not have a good solution. I am thinking of using Ray and starting the batch collection process that way.

IterableDataset was introduced recently, which could be a good candidate for your use case.
I’m not deeply familiar with RL, but based on the descriptions in this topic, it seems the map-style datasets have some limitations in RL setups.

Thank you for your suggestion. As a rough overview of what we would need for RL is to sample from a growing dataset. For every batch, newly added samples in the dataset should be considered as well.

The problem I had with the regular map-style approach was that setting the number of workers to > 0 spawns new processes which seem to receive a deep copy of the sampler. The sampler that they received assumes the dataset size to be constant, which does not work for RL. I thought of somehow fixing this by writing my own sampler, but I do not know if this is going to work. Concretely, I get the error:

    return iter(torch.randint(high=n, size=(self.num_samples,), dtype=torch.int64).tolist())
RuntimeError: invalid argument 2: max must be greater than min, but got: min = 0, max = 0 at /pytorch/aten/src/TH/generic/THTensorRandom.cpp:43

So the problem is that self.num_samples stays at 0. Is there a way to let the sampler update its self.num_samples? Or could a custom sampler maybe access the len method of the current dataset?

I also looked into the IterableDataset and do not yet quite see how it would be used for RL. It seems to me that the dataset that will be passed to the worker subprocesses will not update their length, but I haven’t tried it yet. I am thinking that maybe I could modify the iter method to return the actual length of the current dataset? But then the dataset would only return the indices and obviously I want it to return the states, actions etc and collate them…

1 Like

Edit: Do not use this! I messed up. Because I did not specify the Dataset to be an IterableDataset, the iter method was not used. Instead it simply iterated over indices in the dataset instead of sampling random indices.
I still cannot find a way to make it efficiently run with a dataloader. Multiprocessing works, but is 8x slower with 2 workers and 16x slower with 8 workers.

@zer0ne @ptrblck I finally managed to get it working. One can use the standard Dataset (not the IterableDataset!) by implementing the methods:

def __getitem__(self, key):
    pass
    # return single transition: state, action, reward, next_state, done

and

    def __iter__(self):
        while True:
            idx = random.randint(0, len(self) - 1)
            yield self[idx]

Then we need to instantiate this class and create a dataloader based off it. The dataloader can then be used in this way:

    def sample(self):
        try:
            out = next(self.iter)
        except StopIteration:
            self.iter = iter(self.dataloader)
            out = next(self.iter)
        return out

This will work for any number of workers!
Of course a custom collate_fn needs to be passed into the Dataloader creation.

1 Like

Hello,

I am basically trying to do the same thing, with even more difficulty because I want the collated output to live on CUDA, which is tricky with multiprocessing according to the documentation.

Have you found a way to get this working by any chance?