I have a DataPipe and DataLoader that look like the following.
When I include iterate through it using the datapipe dp
, it’s reasonably fast, even if I add on a batch(16)
. However, when I iterate through it using the DataLoader, it never produces anything. What’s going on? I would like to use DataLoader so that I can take advantage of having multiple workers.
def groupby(row):
return os.path.basename(row[0]).split(".")[0]
def process(row):
if row[0][0].endswith('json'):
stream_json_wrapper = row[0][1]
stream_wav_wrapper = row[1][1]
else:
stream_json_wrapper = row[1][1]
stream_wav_wrapper = row[0][1]
labels = stream_json_wrapper.read()
labels = json.loads(labels.decode('utf-8'))
wav = io.BytesIO(stream_wav_wrapper.read())
wav, _ = torchaudio.load(wav)
return wav, labels
dp = IterableWrapper(["s3://<bucket>/val"])
dp = dp.list_files_by_fsspec()
dp = dp.shuffle()
dp = dp.open_files_by_fsspec(mode="rb")
dp = dp.load_from_tar("r")
dp = dp.groupby(groupby, group_size=2, guaranteed_group_size=2)
dp = dp.map(process))
dl = DataLoader(dataset=dp, batch_size=16, num_workers=4)