Concat Shuffled for DataPipes

Say I have a number of files, and I want to load all the data and shuffle it. The data would be too big for my memory, but I could shuffle the files individually, and then merge them using an “external shuffle”.

I imagine it would look something like FileLister(".").open_files().map(my_parser).in_batch_shuffle().concat_shuffled().

So if FileLister(".").open_files().map(my_parser) gives [[1,2,3],[4,5,6]] then after in_batch_shuffle() we may have [[3,1,2],[4,6,5]], and after concat_shuffled() we’ll have [4, 6, 3, 1, 5, 2].

Right now I can just do


but that requires a fixed buffer size, so it won’t be a true shufle of my data.

Btw, an external shuffle is pretty easy to implement doing something like this:

def cshuf(x, y):
  while x or y:
    px = len(x)/(len(x) + len(y))
    if random.random() < px:
      yield x.pop(0)
      yield y.pop(0)

Is there already a way to do this with DataPipes? Or is there a way I can implement it?