Multiprocessing data loader with manual numpy seed


I have implemented an IterableDataset that generates synthetic data according to a source of randomness in numpy. I need to control the generation process, thus I manually seed the random source. My code looks like:

import numpy as np
import torch
from import IterableDataset, DataLoader

class RandDataset(IterableDataset):
    def __init__(self, seed=42):
        self.seed = seed
        self.data_rng = np.random.default_rng(seed)

    def __iter__(self):
        while True:
            yield self.data_rng.normal()
def collect_data(loader, epochs=10):
    DATA = []
    for idx, data in enumerate(loader):
        if idx==epochs: break   
    return DATA

DATA0 = collect_data(DataLoader(RandDataset(seed=42), batch_size=32))

My actual data-generating procedure is much more CPU-intensive than self.data_rng.normal(). To speed up data generation, I am using a multiprocessing data loader. However, this generates duplicate batches from the two workers.

dl = DataLoader(RandDataset(seed=42), num_workers=2)
DATA2_rep = collect_data(dl)
torch.all(DATA2_rep[0] == DATA2_rep[1]) # result: tensor(True)

My current solution is to seed each worker differently according to the worker id.
I ended up doing something like:

def seed_worked(worker_id):
    worker_info =
    dataset = worker_info.dataset
    worker_id =
    dataset.data_rng = np.random.default_rng(dataset.seed + 1000*worker_id)

dl = DataLoader(MyIterableDataset(seed=42), num_workers=2, worker_init_fn=seed_worked) 
DATA2_norep = collect_data(DataLoader(RandDataset(seed=42), num_workers=2, batch_size=32, worker_init_fn=seed_worked))

The sequence of generated batches DATA2_norep seems reproducible on my machine, and it does not contain repeated instances. However, the result depends on the number of workers by design.
My preferred solution would be to have a single shared source of randomness accessed
sequentially by the workers, so that I would get the same sequence with 1 or more workers. Is that possible?