Where should I shuffle?

I have a DataPipe that look like the following. I’d like to shuffle it, but I am unsure when to add the shuffle in for optimal performance. Right now, I am doing it after the list_files_by_fsspec (as shown) because that’s been reasonable wrt timing. However, this is definitely suboptimal because it means only the tars are shuffled and not the examples after the ExpandDataSource(dp) call, which will make it so multiple examples from the same wav are added to the dataset.

Is there a better way to do this that optimizes both of these?

def groupby(row):
    return os.path.basename(row[0]).split(".")[0]

def process(row):
    if row[0][0].endswith('json'):
        stream_json_wrapper = row[0][1]
        stream_wav_wrapper = row[1][1]
    else:
        stream_json_wrapper = row[1][1]
        stream_wav_wrapper = row[0][1]
    labels = stream_json_wrapper.read()
    labels = json.loads(labels.decode('utf-8'))
    wav = io.BytesIO(stream_wav_wrapper.read())
    wav, _ = torchaudio.load(wav)
    return wav, labels


dp = IterableWrapper(["s3://<bucket>/train"])
dp = dp.list_files_by_fsspec()
dp = dp.shuffle()
dp = dp.open_files_by_fsspec(mode="rb")
dp = dp.load_from_tar("r")
dp = dp.groupby(groupby, group_size=2, guaranteed_group_size=2)
dp = dp.map(process))
dp = ExpandDataSource(dp)