Multi-node model parallelism with PyTorch

Hi, I want to do something exactly the same as that outlined in this paper https://arxiv.org/pdf/1903.11314.pdf under the model parallelism section.

“3.2.2 Model Parallelism. In model parallelism, the DL model is split, and each worker loads a
different part of the DL model for training (see Figure 5). The worker(s) that hold the input layer of
the DL model are fed with the training data. In the forward pass, they compute their output signal
which is propagated to the workers that hold the next layer of the DL model. In the backpropagation
pass, gradients are computed starting at the workers that hold the output layer of the DL model,
propagating to the workers that hold the input layers of the DL model.”

How would I implement the following “toy” example -

  1. The “input layer” as described in the paper will be on my laptop, and only my laptop will contain the training data.

  2. The rest of the layers will be on AWS - apart from the training labels, I do not want to pass in any training data to my AWS instance in this toy example.

Has someone does this? Is there any code someone can point me to look at? I understand I will have to use torch.distributed’s modules and rpc, but is there a reference I can look at?

Hey @mndl1, yep, looks like this P2P comm scenarios can be implemented using RPC. There is no tutorial showing how to handle this exact use case, but here are the RPC tutorials: PyTorch Distributed Overview — PyTorch Tutorials 1.13.1+cu117 documentation

Hi Shen Li,

Thanks for your reply - I am getting stuck on something, and I think it is the way I set MASTER_ADDR and MASTER_PORT.

Below are the simple scripts I’m running - I am trying to send an instruction from my laptop to the EC2 instance to do a simple torch.add between 2 tensors

on my laptop -

# script.py

import torch
import torch.distributed.rpc as rpc
import os

laptop_host = "xx.xx.xx.xx"  # with my laptop's actual public ip
aws_host = "xx.xx.xx.xx". # with my ec2 instance's actual public ip
port = "50051"
os.environ['MASTER_ADDR'] = aws_host
os.environ['MASTER_PORT'] = port
rpc.init_rpc("worker0", rank=0, world_size=2)
print("init success")
ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
print("worker1 success")
rpc.shutdown()

on aws ec2 instance -

# script.py

import torch.distributed.rpc as rpc
import os

laptop_host = "xx.xx.xx.xx"  # with my laptop's actual public ip
aws_host = "xx.xx.xx.xx". # with my ec2 instance's actual public ip
port = "50051"
os.environ['MASTER_ADDR'] = laptop_host
os.environ['MASTER_PORT'] = port
rpc.init_rpc("worker1", rank=1, world_size=2)
print("init success")
rpc.shutdown()

Is this the correct way of doing this? I keep getting the following error on my AWS EC2 instance -

[W socket.cpp:601] [c10d] The client socket has failed to connect to [::ffff:XX.XX.XX.XX]:50051 (errno: 110 - Connection timed out).

where XX.XX.XX.XX is my laptop’s public IP address. I have ruled out security rules as an issue, because I can set up a grpc server/client and communicate with my EC2 instance from my laptop using GRPC.

Hi @mrshenli I have been trying to solve this, and I think the closes issue someone has faced is here - Strange behaviour of GLOO tcp transport

I tried to do something similar to the code snippets in that issue. Below are my code snippets

On my EC2 instance (rank 0)

import torch.distributed as dist
import sys


if len(sys.argv) < 3:
    raise Exception("please enter host and port")

host = sys.argv[1]
port = sys.argv[2]
init_method = f"tcp://{host}:{port}"
print(f"init_method = {init_method}")

# on rank 0
dist.init_process_group(
    backend="gloo", init_method=init_method, rank=0, world_size=2
)

On my laptop (i.e. node of rank 1, and it is a Mac)

import torch.distributed as dist
import sys

if len(sys.argv) < 3:
    raise Exception("please enter host and port")

host = sys.argv[1]
port = sys.argv[2]
init_method = f"tcp://{host}:{port}"
print(f"init_method = {init_method}")

# on rank 1
dist.init_process_group(
    backend="gloo", init_method=init_method, rank=1, world_size=2
)

nothing complicated at all.

Now, on my ec2 instance (node of rank 0) I run the script with the following command -

python script.py localhost 50051

and on my laptop (node of rank1 - it is a Mac), I run the script with the following command

python script.py <Public IP of EC2 instance>  50051

The moment I do this, I get the following error on my EC2 instance -

File "/home/ubuntu/miniconda3/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 862, in _new_process_group_helper
    pg = ProcessGroupGloo(prefix_store, group_rank, group_size, timeout=timeout)
RuntimeError: [/opt/conda/conda-bld/pytorch_1670525493953/work/third_party/gloo/gloo/transport/tcp/pair.cc:211] address family mismatch

and naturally, on my laptop, I get a Connection reset by peer error.

I do not know what to do. I have tried various permutations of the host when running the script on the ec2 instance - python script.py < Public IP of EC2 instance> 50051, or python script.py <Private IP of EC2 instance> 50051, I still get the same error the moment I run the command on my laptop to connect to the ec2 instance.

The error is coming from here in C++ code gloo/pair.cc at 4a5e339b764261d20fc409071dc7a8b8989aa195 · facebookincubator/gloo · GitHub, but the error message don’t provide any more information about the address family mismatch.

Why is this address mismatch happening, and are there any solutions to get around this?

It seems in this thread over here Error: address family mismatch - #4 by lcw, someone faced something similar, and it seems connecting a Linux machine to a MacOS may not be something that is possible. In any case, it will be good if there is more verbose logging of the “address family mismatch” error