Dataloading for text/ audios/ videos/ procedurally generated streaming

is there a way with iterable dataset or dataloader to stream multiple videos in a temporally coherent manner?

I have this piece of code that seems to work but it is completely custom so i guess it has many caveats.

class MultiStreamer(object):
"""
Multithreaded Streaming for Temporally Coherent Batches 

uses the multiprocessing package
expects the "data" in tensor form with array_dim shape per thread.
"""
def __init__(self, make_env, array_dim, batchsize, max_q_size, num_threads, max_iter=int(1e6)):
    self.readyQs = [mp.Queue(maxsize=max_q_size) for i in range(num_threads)]
    self.array_dim = array_dim
    self.num_threads = num_threads
    self.num_videos_per_thread = batchsize // num_threads
    self.max_q_size = max_q_size
    self.batchsize = batchsize
    self.make_env = make_env
    self.batch = np.zeros((self.num_threads, self.num_videos_per_thread,
                           *array_dim), dtype=np.float32) 

    array_dim2 = (self.max_q_size, self.num_videos_per_thread,
                  *array_dim)

    self.m_arrays = (mp.Array('f', int(np.prod(array_dim2)), lock=mp.Lock()) for _ in range(num_threads))
    self.arrays = [(m, np.frombuffer(m.get_obj(), dtype='f').reshape(array_dim2)) for m in self.m_arrays]
    self.max_iter = max_iter

def multi_frame_stream(self, i, m, n, shape):
    group = self.make_env(num=self.num_videos_per_thread)
    j = 0
    while 1:
        m.acquire()
        info = group.next(n[j])
        self.readyQs[i].put((j, info))
        j = (j+1)%self.max_q_size

def __iter__(self):
    procs = [mp.Process(target=self.multi_frame_stream, args=(i, m, n, self.array_dim), daemon=True) for i, (m, n) in
             enumerate(self.arrays)]
    [p.start() for p in procs]
    _utils.signal_handling._set_worker_pids(id(self), tuple(w.pid for w in procs))
    _utils.signal_handling._set_SIGCHLD_handler()
    print('Start Streaming')
    for i in range(self.max_iter):
        start = time.time()
        batch = defaultdict(list)
        for n in range(self.num_threads):
            j, infos = self.readyQs[n].get() 
            m, arr = self.arrays[n]
            self.batch[n] = arr[j]
            for k, v in infos.items():
                batch[k] += v
            m.release()
        batch['data'] = self.batch.reshape(self.batchsize, *array_dim)
        yield batch
    [p.terminate() for p in procs]