I have a DataPipe that look like the following. I’d like to shuffle it, but I am unsure when to add the shuffle in for optimal performance. Right now, I am doing it after the
list_files_by_fsspec (as shown) because that’s been reasonable wrt timing. However, this is definitely suboptimal because it means only the tars are shuffled and not the examples after the
ExpandDataSource(dp) call, which will make it so multiple examples from the same wav are added to the dataset.
Is there a better way to do this that optimizes both of these?
def groupby(row): return os.path.basename(row).split(".") def process(row): if row.endswith('json'): stream_json_wrapper = row stream_wav_wrapper = row else: stream_json_wrapper = row stream_wav_wrapper = row labels = stream_json_wrapper.read() labels = json.loads(labels.decode('utf-8')) wav = io.BytesIO(stream_wav_wrapper.read()) wav, _ = torchaudio.load(wav) return wav, labels dp = IterableWrapper(["s3://<bucket>/train"]) dp = dp.list_files_by_fsspec() dp = dp.shuffle() dp = dp.open_files_by_fsspec(mode="rb") dp = dp.load_from_tar("r") dp = dp.groupby(groupby, group_size=2, guaranteed_group_size=2) dp = dp.map(process)) dp = ExpandDataSource(dp)