I would like to parallelize some operations in the forward function to address an issue similar to here. I have not been able to find a solution to this, but it converged to trying to parallelize. Does this makes sense?
So, I am following this tutorial. I am new to multiprocessing so I am trying a basic task. Based on the tutorial here is my code:
import torch
import os
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
print(f'rank {rank} of {size}')
def init_process(rank, size, data, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
def processes(data):
size = len(data)
processes = []
mp.set_start_method("spawn")
for rank in range(size):
p = mp.Process(target=init_process, args=(rank, size, data, run))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
size_vector = 133
part = int(size_vector/8)
indices = torch.arange(size_vector)
split_data = torch.split(indices, part)
print(split_data)
processes(split_data)
If I run the code above, I get an error: ValueError: bad value(s) in fds_to_keep
with
Traceback (most recent call last):
File "code/torch_dist_tutorial.py", line 58, in <module>
processes(numbers)
File "code/torch_dist_tutorial.py", line 46, in processes
p.start()
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
File "Public/anaconda3/envs/dev/lib/python3.8/multiprocessing/util.py", line 452, in spawnv_passfds
return _posixsubprocess.fork_exec(
If I do not pass data to
mp.Process(target=init_process, args=(rank, size, data, run))
then the script works. So I do not understand why I cannot even pass some tensor to the mp.Process method.
Any comments on what I might be doing wrong are very appreciated. Maybe I am just simply missing multiprocessing basics.