Why is my DataLoader so slow when using DataPipe?

I have a DataPipe and DataLoader that look like the following.

When I include iterate through it using the datapipe dp, it’s reasonably fast, even if I add on a batch(16). However, when I iterate through it using the DataLoader, it never produces anything. What’s going on? I would like to use DataLoader so that I can take advantage of having multiple workers.

def groupby(row):
    return os.path.basename(row[0]).split(".")[0]

def process(row):
    if row[0][0].endswith('json'):
        stream_json_wrapper = row[0][1]
        stream_wav_wrapper = row[1][1]
    else:
        stream_json_wrapper = row[1][1]
        stream_wav_wrapper = row[0][1]
    labels = stream_json_wrapper.read()
    labels = json.loads(labels.decode('utf-8'))
    wav = io.BytesIO(stream_wav_wrapper.read())
    wav, _ = torchaudio.load(wav)
    return wav, labels


dp = IterableWrapper(["s3://<bucket>/val"])
dp = dp.list_files_by_fsspec()
dp = dp.shuffle()
dp = dp.open_files_by_fsspec(mode="rb")
dp = dp.load_from_tar("r")
dp = dp.groupby(groupby, group_size=2, guaranteed_group_size=2)
dp = dp.map(process))

dl = DataLoader(dataset=dp, batch_size=16, num_workers=4)
1 Like