Understanding minimum example for torch.multiprocessing

I would like to parallelize some operations in the forward function to address an issue similar to here. I have not been able to find a solution to this, but it converged to trying to parallelize. Does this makes sense?
So, I am following this tutorial. I am new to multiprocessing so I am trying a basic task. Based on the tutorial here is my code:

import torch
import os
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank, size):
    """ Distributed function to be implemented later. """
    print(f'rank {rank} of {size}')

def init_process(rank, size, data, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

def processes(data):
    size = len(data)
    processes = []
    mp.set_start_method("spawn")

    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size, data, run))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()


if __name__ == "__main__":
    size_vector = 133
    part = int(size_vector/8)
    indices = torch.arange(size_vector)
    split_data = torch.split(indices, part)
    print(split_data)
    processes(split_data)

If I run the code above, I get an error: ValueError: bad value(s) in fds_to_keep
with

Traceback (most recent call last):
  File "code/torch_dist_tutorial.py", line 58, in <module>
    processes(numbers)
  File "code/torch_dist_tutorial.py", line 46, in processes
    p.start()
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 58, in _launch
    self.pid = util.spawnv_passfds(spawn.get_executable(),
  File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/util.py", line 452, in spawnv_passfds
    return _posixsubprocess.fork_exec(

If I do not pass data to
mp.Process(target=init_process, args=(rank, size, data, run))
then the script works. So I do not understand why I cannot even pass some tensor to the mp.Process method.
Any comments on what I might be doing wrong are very appreciated. Maybe I am just simply missing multiprocessing basics.

Ok, I found out what the issue was. For future reference to people that also don’t understand like myself:
Apparently, you cannot pass a tensor to the mp.Process that has data shared with other processes. If I add a line and substitute the mp.Process argumentsd accordingly,

data_i = data[rank]
p = mp.Process(target=init_process, args=(rank, size, data_i, run))

Then the processes can use that data.
Still, I am not sure then why in the example repo of the multiprocessing pest practices they can pass the dataset

dataset1 = datasets.MNIST('../data', train=True, download=True,
                       transform=transform)

If there is some knowledgeable user, would be helpful if they can explain this a bit more.

1 Like

I’m also having this issue, how can I pass the whole dataset to mp.Process(args=(whole_dataset,…))?

Thanks a lot!