I have this minimal example:
from torchdata.datapipes.iter import IterDataPipe
class Dataset(IterDataPipe):
def __len__(self):
return 2
def __getitem__(self, idx):
return np.arange(5)+idx
def __iter__(self) -> int:
for i in range(len(self)):
yield self[i]
datapipe = Dataset()
reading_service = MultiProcessingReadingService(num_workers=2)
data = DataLoader2(datapipe, reading_service=reading_service)
for x in data:
print(x)
This is a simple dataset with 2 items to load (“0” and “1”). When num_workes is 1 it works as expected I get the output “0 1”. When the number of workers is > 1 the output is replicated by the number of workers so for num_workes = 2 it would be “0 0 1 1”, and for 5 “0 0 0 0 0 1 1 1 1 1”. I tried to make sense out of it, but the prefetching seem to start the threads an all do the same. Is there something wrong in my example or do i have a misunderstanding how it supposed to work. Since it’s suggested to use IterDataPipe instead of the MapDataPipe, how is it even supposed to work with multiple threads ?