I have a torchdata data pipe which starts off with a very expensive process: reading a video into frames.
Ideally, I’d like to duplicate the frames from the video (when training on e.g, a single video for overfitting) to avoid reading the video from disk each time.
I tried using .cycle to do this, but that doesn’t work.
An alternative option would be to just do a map that (for example) doubles the number of frames, but I was wondering if there is a better “recommended” solution for this type of thing.
def local_datapipe(dir):
assert isinstance(dir, str), f"dir must be a str, got {type(dir)}"
datapipe = dp.iter.FSSpecFileLister(str(dir))
print(f"Found {len(list(datapipe))} files in {dir}")
datapipe = datapipe.map(load_video)
return datapipe
def load_video(path) -> VideoReader:
print("Loading video...")
return VideoReader(path, "video")
def video_overfit_dataloader(
loader_dp: dp.iter.IterDataPipe, fn, single_image=False, **kwargs
):
def repeat(N, x):
return [x for _ in range(N)]
# `fn` turns a video into a list of frames
dp = loader_dp.flatmap(fn)
if single_image:
dp = dp.header(1)
dp = dp.flatmap(functools.partial(repeat, 5000))
if not single_image:
dp = dp.shuffle()
dp = dp.sharding_filter()
dl = DataLoader(dp, **kwargs, drop_last=True)
assert_dl_nonempty(dl, **kwargs)
return dl
Originally, I was doing:
def video_overfit_dataloader(
loader_dp: dp.iter.IterDataPipe, fn, single_image=False, **kwargs
):
def repeat(N, x):
return [x for _ in range(N)]
dp = loader_dp.flatmap(fn)
if single_image:
dp = dp.header(1)
dp = dp.cycle(5000)
if not single_image:
dp = dp.shuffle()
dp = dp.sharding_filter()
dl = DataLoader(dp, **kwargs, drop_last=True)
assert_dl_nonempty(dl, **kwargs)
return dl
Note: I’m not sure if the above is exactly what I was doing originally, but I think?? it should be close.
When using the .cycle, I noticed that the my (single) video was being loaded many times, when really I just wanted that single frame to be repeated many times.
Note, I’m not sure if the code accurately reproduces what I’m doing, so it might be that I am using cycle correctly in the given example but previously I was using it incorrectly.
.cycle does indeed re-do all the operations before n times, one workaround is to put .in_memory_cache(size=BUFFER_SIZE) after it. Otherwise, .flatmap() is likely a better option here.
I can see how this is not the best user experience. I am opening a GitHub issue for discussion.
Elements are popped off the buffer in shuffle as soon as they are yielded, whereas the elements in in_memory_cache persist. Therefore, using the latter should mean you will not be reading from the disk more than once unless you cache size is too small relative to the amount of data you’d like to store.
@nivek I’ve been trying to understand the cycle implementation so I can figure out things like this in the future and it doesn’t quite make sense to me. Can you give me a few pointers?
def __iter__(self) -> Iterator[T_co]:
i = 0
while self.count is None or i < self.count:
yield from self.source_datapipe
i += 1
From reading this, it looks like it will do a yield from the source data pipe a total of count times.
The thing that is confusing is, (let’s say count == 3 is 3), then I would expect cycle to yield from the source data pipe 3 times.
But, if the source data pipe has a total of 3 elements then I would expect cycle to yield 9 elements, not 3. Does my confusion here make sense?
Your description of buffer shuffle is basically accurate. As you describe in step 3, it yields element from the buffer (i.e. it pops). As soon as it yields, the element is removed from the buffer. If you want that element again, you have to re-do the previous operations, such as reading from files and etc.
Whereas for a cache, an element is not evicted upon reading. It stays there, such that you can read the same element multiple times without eviction as long as it has enough space. Does that make sense?
The line yield from self.source_datapipe will run 9 times, and each time it is ran it will yield an element from the DataPipe. After 3 yields, the source DataPipe is exhausted and we move onto i += 1. In the next iteration, it resets the source DataPipe and does the same thing. Overall, it repeats n_cycle times, so 3 x 3 = 9 elements. Does that answer your question?
i = 0
while self.count is None or i < self.count:
for e in self.source_datapipe:
yield e
i += 1
I think your confusion might be around what happens when you call iter(datapipe) multiple times. Basically every time iter(datapipe) is called (explicitly or implicitly), it most likely also call on its source DataPipe as well, hence operations need to be re-done. It saves memory but at the cost of needing to re-do operations. You can avoid this by using the various caching mechanisms that we provide.
Thanks for the detailed explanations! The yield from makes sense now (I think). Basically, it ‘passes through’ the underlying generator from the source datapipe.
In the case of cycle, we “pass through” the underlying generator “count” times.
Edit: nevermind, think I figured it out I’m still a bit confused about the shuffle buffer vs cache b/c it just seems like the difference is a difference in ordering. I.e., the shuffle buffer pops out elements randomly while the cache is FIFO. I’ve been looking at the cache code and it doesn’t really make sense to me. I might try & re-implement it, and see if that helps.