Understanding Dataloading2 and MultiProcessingReadingService

I have this minimal example:

from torchdata.datapipes.iter import IterDataPipe

class Dataset(IterDataPipe):
    def __len__(self):
        return 2

    def __getitem__(self, idx):
        return np.arange(5)+idx

    def __iter__(self) -> int:
        for i in range(len(self)):
            yield self[i]

datapipe = Dataset()
reading_service = MultiProcessingReadingService(num_workers=2)
data = DataLoader2(datapipe, reading_service=reading_service)

for x in data:
  print(x)

This is a simple dataset with 2 items to load (“0” and “1”). When num_workes is 1 it works as expected I get the output “0 1”. When the number of workers is > 1 the output is replicated by the number of workers so for num_workes = 2 it would be “0 0 1 1”, and for 5 “0 0 0 0 0 1 1 1 1 1”. I tried to make sense out of it, but the prefetching seem to start the threads an all do the same. Is there something wrong in my example or do i have a misunderstanding how it supposed to work. Since it’s suggested to use IterDataPipe instead of the MapDataPipe, how is it even supposed to work with multiple threads ?

This behavior might be expected if I’m understanding the docs correctly:

When num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. get_worker_info(), when called in a worker process, returns information about the worker. It can be used in either the dataset’s __iter__() method or the DataLoader ‘s worker_init_fn option to modify each copy’s behavior.

You might thus want to use the worker info to e.g. define the start index and the stride.

Thank you for pointing this out.

  • If i call get_worker_info() (like in data/helpers.py at 84587ff57575fd47fcae61635a3f4ffc1e639941 · pytorch/data · GitHub) it returns None.
  • If i use the _worker_init_fn the datapipe is overwritten by “PrefetcherIterDataPipe”, so i can’t pass any information to my datapipe to distribute the data.
  • If I rewrite everything to MapDataPipe i get “AttributeError: ‘Dataset’ object has no attribute ‘prefetch’”

Is there a working example with prefetch and multiprocessing, whether MapDataPipe or IterDataPipe, or is the state still too “beta”?