Distributed Model trained in multiple nodes

Hi there,

I’m currently trying to run a demo of a PyTorch model trained with 2 nodes, where each node contains 2 GPUs. It is based on the tutorial and I’m using Openmpi to handle the communication. Furthermore, the backend for torch.distributed.init_process_group is ‘mpi’ (I followed the tutorials provided to build Pytorch from source. I am forced to used the ‘mpi’ backend and Openmpi since these are the only compatible options available in the cluster I’ve access to.
Here is the main function where I use the mpi4py library to establish the connection between Openmpi and Pytorch:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from mpi4py import MPI

def example(local_offset, global_rank, world_size, hostname):
        rank=local_offset
        if global_rank==1:
                rank += 2
        print(f'checkpoint{1} with rank {rank} and world_size {world_size}')
        dist.init_process_group(backend="mpi", rank=rank, world_size=world_size)
        print(f'checkpoint{2}')
        model = nn.Linear(10, 10).to(rank)
        print(f'checkpoint{3}')
        ddp_model = DDP(model, device_ids=[rank])#,output_device=rank)
        print(f'checkpoint{4}')
        loss_fn = nn.MSELoss()
        print(f'checkpoint{5}')
        optimizer = optimizer.SGD(ddp_model.parameters(), lr=0.001)
        print(f'checkpoint{6}')

        outputs = ddp_model(torch.randn(20, 10).to(rank))
        print(f'checkpoint{7}')
        labels = torch.randn(20, 10).to(rank)
        print(f'checkpoint{8}')

        loss_fn(outputs, labels).backward()
        print(f'checkpoint{9}')
        optimizer.step()

def main():
        comm = MPI.COMM_WORLD
        world_size = comm.Get_size()
        world_rank = comm.Get_rank()
        hostname = MPI.Get_processor_name()
        print(f"\nI am {world_rank} of {world_size} in {hostname}")

        curr_env = os.environ.copy()
        curr_env['MASTER_ADDR'] = 'hostname_addr'
        curr_env['MASTER_PORT'] = '12345'
        curr_env['WORLD_SIZE'] = str(world_size*2)

        mp.spawn(example,
                args=(world_rank, world_size*2,hostname,),
                nprocs=world_size,
                join=True)

if __name__=="__main__":
        main()

Currently, I get the following exception -

I am 1 of 2 in node050
I am 0 of 2 in node049
checkpoint1 with rank 1 and world_size 4
checkpoint1 with rank 0 and world_size 4
checkpoint1 with rank 2 and world_size 4
checkpoint1 with rank 3 and world_size 4
--------------------------------------------------------------------------
Open MPI has detected that this process has attempted to initialize
MPI (via MPI_INIT or MPI_INIT_THREAD) more than once. This is
erroneous.
--------------------------------------------------------------------------
[node049:12317] *** An error occurred in MPI_Init_thread
[node049:12317] *** reported by process [3237216257,0]
[node049:12317] *** on a NULL communicator
[node049:12317] *** Unknown error
[node049:12317] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[node049:12317] ***     and potentially your MPI job)
Traceback (most recent call last):
  File "/tutorial_prof_students/distrib_data_parallel.py", line 116, in <module>
        main()
  File "/tutorial_prof_students/distrib_data_parallel.py", line 113, in main
        join=True)
  File "/pyvenv/condaMPI2/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 247, in spawn
        return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/pyvenv/condaMPI2/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 205, in start_processes
        while not context.join():
  File "/pyvenv/condaMPI2/lib/python3.7/site-packages/torch/multiprocessing/spawn.py", line 160, in join
        exit_code=exitcode
torch.multiprocessing.spawn.ProcessExitedException: process 0 terminated with exit code 1
[fs0:23200] 3 more processes have sent help message help-mpi-runtime.txt / mpi_init: invoked multiple times
[fs0:23200] Set MCA parameter "orte_base_help_aggregate" to 0 to see all help / error messages
[fs0:23200] 3 more processes have sent help message help-mpi-errors.txt / mpi_errors_are_fatal unknown handle

I’ve tried several recommendations on other discussions like
here but it didn’t work.

What am I doing incorrectly?
Any tips and help would be great!
Thanks

Hi

Some questions:

  • could you provide code of your example() that contains torch.distributed.init_process_group
  • what is world_size in your example? (seems like on each node you should spawn number of processes = number of GPUs = 2)
  • could you add logging to before and after torch.distributed.init_process_group and see if process group is being inited correctly

Yes, I’ve updated the code with the example function and some loggings. The issue is at the torch.distributed.init_process_group as the error log shows.

Regarding the world_size, I specify 4 processes, where each node contains 2.

I see, the problem is with MPI initialization. Could you try instructions here:

https://pytorch.org/tutorials/intermediate/dist_tuto.html (under MPI Backend).

I.e.

1. Replace the content under `if __name__ == '__main__':` with `init_process(0, 0, run, backend='mpi')`.
2. Run `mpirun -n 4 python myscript.py`.

Yes, I’ve updated the code accordingly and ran with 2 GPUs (1 per node) but now I get a similar but less descriptive error:

checkpoint1 with rank 0 and world_size 0
checkpoint1 with rank 0 and world_size 0
--------------------------------------------------------------------------
Open MPI has detected that this process has attempted to initialize
MPI (via MPI_INIT or MPI_INIT_THREAD) more than once.  This is
erroneous.
--------------------------------------------------------------------------
[node024:18276] *** An error occurred in MPI_Init_thread
[node024:18276] *** reported by process [3619422209,0]
[node024:18276] *** on a NULL communicator
[node024:18276] *** Unknown error
[node024:18276] *** MPI_ERRORS_ARE_FATAL (processes in this communicator will now abort,
[node024:18276] ***	and potentially your MPI job)
[fs0:19944] 1 more process has sent help message help-mpi-runtime.txt / mpi_init: invoked multiple times
[fs0:19944] Set MCA parameter "orte_base_help_aggregate" to 0 to see all help / error messages
[fs0:19944] 1 more process has sent help message help-mpi-errors.txt / mpi_errors_are_fatal unknown handle

Note that the logging outputted is before the dist.init_process_group

If I perfom the basic checks in both nodes I get the expected

1.8.0a0+262bd64
1.8.0a0+262bd64
Hello from P0 on node024 out of 20
<torch.cuda.device object at 0x2aaafd2efc10>
1
GeForce GTX TITAN X
True
0
-------------------------
Hello from P1 on node025 out of 20
<torch.cuda.device object at 0x2aaafd2efd50>
1
GeForce GTX TITAN X
True
0
-------------------------