I am trying to write a custom synchronization program to work with Pytroch. In order to achieve it I need a parallel process / thread to wait for communication from other processes, while another process does some computations.
My problem is I am using mpi back-end and I spawn processes using :
mpirun -n 8 python mycode.py
Now within my program I use PyTorch’s multi-processing module and spawn 2 more processes. So in total I have 8 core process ranked [0, 1, … 7] and each of these have 2-sub processes with local ranks [0, 1]. Here is my problem how can I get sub-process to communicate. For example sub-process 0 of main process 0 to use distributed send / receive to the sub-process 0 of let’s say process 3. The code within mycode.py can be seen as:
def run(local_rank, rank, world_size):
print("I was spawned")
ts = torch.tensor([1.]).cuda()
if rank == 0:
ts += 20
dist.send(ts, dst=1) # I want this to be rank=1 not local_rank=1
else:
dist.recv(ts)
print("Received Value: ", ts[0])
if __name__ == '__main__':
# Initialize Process Group
dist.init_process_group(backend="mpi")
# get current process information
world_size = dist.get_world_size()
rank = dist.get_rank()
# allocate GPU to this process
torch.cuda.set_device(0)
# spawn parallel workers
mp.spawn(run, nprocs= 2, args=(rank, world_size))
The above program errors out as follows:
I was spawned
I was spawned
Traceback (most recent call last):
File "code.py", line 36, in <module>
mp.spawn(run, nprocs= 2, args=(rank, world_size))
File "/home/usama/anaconda3/envs/thesis/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 167, in spawn
while not spawn_context.join():
File "/home/usama/anaconda3/envs/thesis/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 114, in join
raise Exception(msg)
Exception:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/home/usama/anaconda3/envs/thesis/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 19, in _wrap
fn(i, *args)
File "/home/usama/bin/work/test/code.py", line 17, in run
dist.send(ts, dst=1)
File "/home/usama/anaconda3/envs/thesis/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 660, in send
_check_default_pg()
File "/home/usama/anaconda3/envs/thesis/lib/python3.7/site-packages/torch/distributed/distributed_c10d.py", line 185, in _check_default_pg
"Default process group is not initialized"
AssertionError: Default process group is not initialized
So my basic question is, how do I perhaps initialize the default process group? the one that I spawn with openmpi? Any thoughts please?
Note:
I have tried this with threads and it should technically work. But it is a reported bug that I face with threads when I try to create a CUDA tensor. The bug with threads can be found reprted here: https://github.com/pytorch/pytorch/issues/16559