How to duplicate samples in a datapipe?

I have a torchdata data pipe which starts off with a very expensive process: reading a video into frames.

Ideally, I’d like to duplicate the frames from the video (when training on e.g, a single video for overfitting) to avoid reading the video from disk each time.

I tried using .cycle to do this, but that doesn’t work.
An alternative option would be to just do a map that (for example) doubles the number of frames, but I was wondering if there is a better “recommended” solution for this type of thing.

Can you elaborate on what you mean by duplicating (i.e. what the input and output should look like)? And what makes .cycle not work for your use case?

Is it something like [0, 1, 2] -> [0, 0, 1, 1, 2, 2]? In that case, I would recommend using .flatmap(repeat_fn).

Note that if you are going to shuffle later, then .cycle should work.

@nivek

My dataloaders look like this:

def local_datapipe(dir):
    assert isinstance(dir, str), f"dir must be a str, got {type(dir)}"
    datapipe = dp.iter.FSSpecFileLister(str(dir))
    print(f"Found {len(list(datapipe))} files in {dir}")
    datapipe = datapipe.map(load_video)
    return datapipe


def load_video(path) -> VideoReader:
    print("Loading video...")
    return VideoReader(path, "video")

def video_overfit_dataloader(
    loader_dp: dp.iter.IterDataPipe, fn, single_image=False, **kwargs
):
    def repeat(N, x):
        return [x for _ in range(N)]

    # `fn` turns a video into a list of frames
    dp = loader_dp.flatmap(fn)
    if single_image:
        dp = dp.header(1)
    dp = dp.flatmap(functools.partial(repeat, 5000))
    if not single_image:
        dp = dp.shuffle()
    dp = dp.sharding_filter()
    dl = DataLoader(dp, **kwargs, drop_last=True)
    assert_dl_nonempty(dl, **kwargs)
    return dl

Originally, I was doing:

def video_overfit_dataloader(
    loader_dp: dp.iter.IterDataPipe, fn, single_image=False, **kwargs
):
    def repeat(N, x):
        return [x for _ in range(N)]

    dp = loader_dp.flatmap(fn)
    if single_image:
        dp = dp.header(1)
        dp = dp.cycle(5000)
    if not single_image:
        dp = dp.shuffle()
    dp = dp.sharding_filter()
    dl = DataLoader(dp, **kwargs, drop_last=True)
    assert_dl_nonempty(dl, **kwargs)
    return dl

Note: I’m not sure if the above is exactly what I was doing originally, but I think?? it should be close.
When using the .cycle, I noticed that the my (single) video was being loaded many times, when really I just wanted that single frame to be repeated many times.

Note, I’m not sure if the code accurately reproduces what I’m doing, so it might be that I am using cycle correctly in the given example but previously I was using it incorrectly.

.cycle does indeed re-do all the operations before n times, one workaround is to put .in_memory_cache(size=BUFFER_SIZE) after it. Otherwise, .flatmap() is likely a better option here.

I can see how this is not the best user experience. I am opening a GitHub issue for discussion.

Does the shuffle (which has a buffer), also use an in-memory cache? I assume it has to.

Elements are popped off the buffer in shuffle as soon as they are yielded, whereas the elements in in_memory_cache persist. Therefore, using the latter should mean you will not be reading from the disk more than once unless you cache size is too small relative to the amount of data you’d like to store.

@nivek I’m having a hard time understanding this. Presumably, for shuffling to work, it is something like

  1. Wait till the shuffle buffer is full (has e.g 10K elements)
  2. When the destination datapipe needs an element, pop a random element out of the buffer, and then replace it with an element from the source datapipe
  3. If the destination source datapipe has no more elements just yield elements until the buffer is empty

It seems like the result of this would be that there is effectively an in-memory cache of 10K elements.

@nivek I’ve been trying to understand the cycle implementation so I can figure out things like this in the future and it doesn’t quite make sense to me. Can you give me a few pointers?

    def __iter__(self) -> Iterator[T_co]:
        i = 0
        while self.count is None or i < self.count:
            yield from self.source_datapipe
            i += 1

From reading this, it looks like it will do a yield from the source data pipe a total of count times.
The thing that is confusing is, (let’s say count == 3 is 3), then I would expect cycle to yield from the source data pipe 3 times.

But, if the source data pipe has a total of 3 elements then I would expect cycle to yield 9 elements, not 3. Does my confusion here make sense?

Your description of buffer shuffle is basically accurate. As you describe in step 3, it yields element from the buffer (i.e. it pops). As soon as it yields, the element is removed from the buffer. If you want that element again, you have to re-do the previous operations, such as reading from files and etc.

Whereas for a cache, an element is not evicted upon reading. It stays there, such that you can read the same element multiple times without eviction as long as it has enough space. Does that make sense?

Edit: Sorry previous description is slightly off.

The line yield from self.source_datapipe will run 9 times, and each time it is ran it will yield an element from the DataPipe. After 3 yields, the source DataPipe is exhausted and we move onto i += 1. In the next iteration, it resets the source DataPipe and does the same thing. Overall, it repeats n_cycle times, so 3 x 3 = 9 elements. Does that answer your question?

You can basically re-write that code snippet as:

i = 0
while self.count is None or i < self.count:
    for e in self.source_datapipe:
        yield e
    i += 1

I think your confusion might be around what happens when you call iter(datapipe) multiple times. Basically every time iter(datapipe) is called (explicitly or implicitly), it most likely also call on its source DataPipe as well, hence operations need to be re-done. It saves memory but at the cost of needing to re-do operations. You can avoid this by using the various caching mechanisms that we provide.

Thanks for the detailed explanations! The yield from makes sense now (I think). Basically, it ‘passes through’ the underlying generator from the source datapipe.

In the case of cycle, we “pass through” the underlying generator “count” times.

Edit: nevermind, think I figured it out
I’m still a bit confused about the shuffle buffer vs cache b/c it just seems like the difference is a difference in ordering. I.e., the shuffle buffer pops out elements randomly while the cache is FIFO. I’ve been looking at the cache code and it doesn’t really make sense to me. I might try & re-implement it, and see if that helps.

Thanks again for all the responses!