Distributed training in multinode

Hi,

As per my knowledge with pytorch you can do parallel training on multiple GPUs/CPUs on a single node without any issue but its not matured yet to do multinode training without any issues considering asynchronous data parallelism. if its supported on multinode too please provide me a simple example to test it out.

Thanks.

Any one suggest please

Hey @hariram_manohar

There are multiple tools in PyTorch to facilitate distributed training:

BTW, for distributed training questions, please use the “distributed” tag, so that we can get back to you promptly.

Hi Shen Li,

Hope what i required is RPC as i want to do the training on multiple machines but in the tutorial or in the doc i haven’t seen anywhere mentioning about multiple servers definition where we need to execute. where do we define them ? if any detailed blog is there please let me know.

Basically i am trying to do training on multiple machines for the code mentioned in Huge False Negatives

I want to train a model for binary classification on multiple machines at same time.

Thanks.

Hi @hariram_manohar

Both DDP and RPC can work on multiple machines. They operate on a group of processes (called ProcessGroup), and it is up to the application how to place those processes (on a single machine or multiple machines). You just need to call init_process_group or init_rpc with proper arguments. The choice between DDP or RPC depends on whether you want to divide input data or split model.

Take this as an example. The code launches two processes on the same machine. But you can 1) set MASTER_ADDR to machine_1_ip and MASTER_PORT to e.g. 23456, and 2) run the example() function on two machines. The two processes will rendezvous using the ip and port you provided and DDP will then handle the communication for you.

Hi @mrshenli

getting below error for mentioned example, how to disable cuda.

  File "/apps/dl/torch/ddp.py", line 20, in example
	model = nn.Linear(10, 10).to(rank)
	
	  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/cuda/__init__.py", line 94, in _check_driver
	raise AssertionError("Torch not compiled with CUDA enabled")
AssertionError: Torch not compiled with CUDA enabled

i haven’t installed CUDA and not required also…

To avoid using CUDA, remove all .to(rank) and remove device_ids=[rank] from DDP constructor (or set device_ids=None). Then DDP will only use CPU.

hi @mrshenli

here are the errors i am getting.

Error on Master/primary server:

Traceback (most recent call last):
  File "ddp.py", line 48, in <module>
	main()
  File "ddp.py", line 42, in main
	mp.spawn(example,
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 171, in spawn
	while not spawn_context.join():
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 118, in join
	raise Exception(msg)
Exception:

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 19, in _wrap
	fn(i, *args)
  File "/apps/dl/torch/ddp.py", line 22, in example
	ddp_model = DDP(model)
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 301, in __init__
	self._distributed_broadcast_coalesced(
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 485, in _distributed_broadcast_coalesced
	dist._broadcast_coalesced(self.process_group, tensors, buffer_size)
RuntimeError: [/opt/conda/conda-bld/pytorch_1579022027171/work/third_party/gloo/gloo/transport/tcp/pair.cc:572] Connection closed by peer [SECONDARY_IP]:43924

ERROR on secondary:

terminate called after throwing an instance of 'gloo::EnforceNotMet'
  what():  [enforce fail at /opt/conda/conda-bld/pytorch_1579022027171/work/third_party/gloo/gloo/transport/tcp/device.cc:281] rv != -1. -1 vs -1. epoll_ctl: No such file or directory
Traceback (most recent call last):
  File "ddp.py", line 48, in <module>
	main()
  File "ddp.py", line 42, in main
	mp.spawn(example,
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 171, in spawn
	while not spawn_context.join():
  File "/apps/miniconda3/envs/torch/lib/python3.8/site-packages/torch/multiprocessing/spawn.py", line 105, in join
	raise Exception(
Exception: process 1 terminated with signal SIGABRT

here is the reference in both servers:

Master:

def example(rank, world_size):
	os.environ['MASTER_ADDR'] = 'localhost'
	os.environ['MASTER_PORT'] = '12355'

secondary:

def example(rank, world_size):
	os.environ['MASTER_ADDR'] = 'master_ip'
	os.environ['MASTER_PORT'] = '12355'
  1. can you try setting MASTER_ADDR to the same IP?
  2. Can you show the ddp.py is the same as the one in the ddp note? If so, you need to modify rank, world_size, etc. accordingly.

here is the ddp.py from master… other server has only difference it has master_ip address

import torch.distributed as dist
import torch.multiprocessing as mp
import os
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def cleanup():
	dist.destroy_process_group()

def example(rank, world_size):
	os.environ['MASTER_ADDR'] = 'localhost'
	os.environ['MASTER_PORT'] = '12355'

	# create default process group
	dist.init_process_group("gloo", rank=rank, world_size=world_size)
	# create local model
	model = nn.Linear(10, 10)
	# construct DDP model
	ddp_model = DDP(model)
	# define loss function and optimizer
	loss_fn = nn.MSELoss()
	optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

	# forward pass
	outputs = ddp_model(torch.randn(20, 10))
	labels = torch.randn(20, 10)
	# backward pass
	loss_fn(outputs, labels).backward()
	# update parameters
	optimizer.step()
	print(outputs)

	cleanup()



def main():
	world_size = 2
	mp.spawn(example,
		args=(world_size,),
		nprocs=world_size,
		join=True)

if __name__=="__main__":
	main()

That example code was intended to run on a single node alone. If you use it on multiple nodes, there are at least two problems:

  1. Incorrect world_size and rank: Say you run this on two different nodes. Then each would try to spawn two processes. So that you will have four process group instances, but the world_size is still two, and rank is 0, 1, 0, 1, instead of 0, 1, 2, 3, which would mess up the rendezvous algorithm. To fix this, you need to pass in correct rank and world_size for each example(rank, world_size) invcation.
  2. Can you change the MASTER_ADDR to use the same IP instead of master is using localhost and others are using master ip?

yeah i have changed from localhost to master_ip

is it fine if i change like this?

master:

world_size = 2
mp.spawn(example,
	args=(0,1),
	nprocs=world_size,
	join=True)

other:

world_size = 2
mp.spawn(example,
	args=(2,3),
	nprocs=world_size,
	join=True)
  1. This does not look correct to me. The world_size is the global size of the process group, which should be 4 in this case, but nprocs should be 2 on each node as each node spawns two processes.
  2. The args doesn’t seem to match the signature def example(rank, world_size). If you do this, it would pass (0, 0, 1) and (1, 0, 1) to the two processes on the first node, and (0, 2, 3) and (1, 2, 3) to the second node. But, it should be passing (0, 4), (1, 4), (2, 4), and (3, 4) to example Checkout the multiprocessing.spawn() API doc.

To make it work on multiple nodes, you can modify the example signature to sth like def example(local_rank, global_rank_offset, world_size), then use global_rank_offset + local_rank as the rank for init_process_group. Then you can spawn it like:

    world_size = 4
    global_rank_offset = 0 # this should be 2 for the other machine
    mp.spawn(example,
        args=(global_rank_offset, world_size),
        nprocs=world_size,
        join=True)

If you want to avoid manually configuring the rank, you can try the launch.py script. See this example.

looks like its working :grinning:
now i want to try out for actual problem…

1 Like