Connect [127.0.1.1]:[a port]: Connection refused

I am using the batch processing example for PS.
I put the PS and a worker on two machines. I changed the os.environ[‘MASTER_ADDR’] = ‘localhost’
os.environ[‘MASTER_PORT’] = ‘29500’ to the machine with rank 0.
PS runs and waiting for worker. But the worker send an error:
Traceback (most recent call last):
File “/home/skh018/PycharmProjects/test_DDL_pytorch/worker1.py”, line 148, in
run(1, world_size)
File “/home/skh018/PycharmProjects/test_DDL_pytorch/worker1.py”, line 124, in run
rpc.init_rpc(
File “/home/skh018/envs/test_DDL_pytorch/lib/python3.8/site-packages/torch/distributed/rpc/init.py”, line 90, in init_rpc
api._init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
File “/home/skh018/envs/test_DDL_pytorch/lib/python3.8/site-packages/torch/distributed/rpc/api.py”, line 293, in _init_rpc_backend
rpc_agent = backend_registry.init_backend(
File “/home/skh018/envs/test_DDL_pytorch/lib/python3.8/site-packages/torch/distributed/rpc/backend_registry.py”, line 94, in init_backend
return backend.value.init_backend_handler(*args, **kwargs)
File “/home/skh018/envs/test_DDL_pytorch/lib/python3.8/site-packages/torch/distributed/rpc/backend_registry.py”, line 194, in _tensorpipe_init_backend_handler
group = _init_process_group(store, rank, world_size)
File “/home/skh018/envs/test_DDL_pytorch/lib/python3.8/site-packages/torch/distributed/rpc/backend_registry.py”, line 117, in _init_process_group
group = dist.ProcessGroupGloo(store, rank, world_size, process_group_timeout)
RuntimeError: [/opt/conda/conda-bld/pytorch_1595629395347/work/third_party/gloo/gloo/transport/tcp/pair.cc:769] connect [127.0.1.1]:62374: Connection refused

I tried to specify the init_method to tcp in TensorPipeRPCBackendOption
But the error still there.

Hmm, this log seems to suggest it still tries to use localhost. Can you confirm that this is different from the ip and port you passed to init_method?

If yes, can you check what does the following command return?

getent hosts `hostname`

If it does not return the expected IP, you might need to set the GLOO_SOCKET_IFNAME to specify the correct network interface. See this doc: Distributed communication package - torch.distributed — PyTorch 2.1 documentation

Yes, I changed it to the master node:
image
I put this on master and other node.
I think the error message is not clear. When I put a dummy IP it runs and just waits. But with the correct IP (node with rank 0) it returns that error.
For that command, it returns the worker IP and name.

I just changed the batch_update_size to 1, and ran the code with rank 0 and other with rank 1. I also changed the master address and port.
I also had to specify the backend to avoid other error as this:
image
Any clue to check? Thanks

Yep, default backend is ProcessGroup is you are using PyTorch version <= v1.6. We changed the default backend to TensorPipe in v1.7. BTW, which version are you using?

Regarding the connection error, have you tried using ProcessGroup backend instead of TensorPipe backend. I would assume it will run into the same problem, but if it does not, we can narrow down the problem. cc @lcw

Yes, I changed it to the master node:

What did you specify for the worker node? And the Connection refused error only occur on worker?

I am using the 1.6 version. I specified both same value as bellow figure unless GLOO_SOCKET_IFNAME that I did not set it for the master node.
image

Now when I set GLOO_SOCKET_IFNAME in the master node as well, the worker sent back this error:

Both errors come only on the worker node. and it happens only when the master is running.
I can ping and telnet to the 2222 port of master node from worker node.
When I changed the backend it faced another error. I am working if I can solve it.

That’s seem to be one step toward the correct direction, as even though it shows no route error, it at least tries to use the correct IP. Do you need any permission to access port 49552?

since it is client that intialize the connection, this port is random and each time is changed.
I check this by telnet, I see the telnet can open random port and connect to the master node.
However, I am not admin or sudoer of these servers. Could this be a problem? Does the code need super admin premision?

IIUC, the code will open random ports on both master and worker and then use those ports for communication. So I guess there an be permission issues here? @lcw have you seen anything similar before?

One way to test whether this is RPC-specific issue or permission issue is to call init_process_group manually, and see if it works. You can use this short DDP example. Since this code path is widely used and better tested than RPC, if it still fails with same error, I would assume there are some permission/firewall issue that needs to be addressed.

I ran DDP example. The error is the same, no route to host.
I do not have root privilege on these servers :slight_smile: and it seems these processes try to open and get connect after the first connection on a specified port.
Do you know which port should be open between them? I should ask administrator.
Is it related to port or root privilege ?

Hi, sorry I didn’t answer earlier, I’ll try to catch up with what was said.

As @mrshenli pointed out, the fact that RPC was attempting to use 127.0.1.1 (a localhost address) was probably due to your machine/networking not being set up in a way that allows to resolved your hostname to an IP address. Such a set-up is common in cloud providers and computing clusters but not so much in personal machines. Perhaps this should be clarified. The “workaround” is to use GLOO_SOCKET_IFNAME or TP_SOCKET_IFNAME (for ProcessGroup and TensorPipe respectively).
(Actually, right now, there’s an issue where if you use TensorPipe you need to set both those env vars)

Another thing to notice is that the IP address&port that you provide as an init_method are not in fact the ones that are used to connect the machines one to another. I realize that sounds a bit silly but the reason is one of internal code modularity: the “real” connection cannot extract such information from the init_method because that could be a file, or some other non-IP system. Hence the real connection is established in its own way, which is what lead to the localhost address being selected, and to port 2222 being “diregarded”.

The way these “real” connections are created is the same in both ProcessGroup and TensorPipe: the “server-side” of the connection opens a listening socket on an arbitrary high port, and then the client connects to that one. The port that is used is assigned automatically by the kernel among all the ones that are currently free. There is no way to predict it, limit it to a specific range, or specify it.

The “no route to host” issue seems to me to be either an issue in the routing tables (but you said you’re able to ping the machine, so that shouldn’t be the case) or a firewall prohibiting connections to the high port numbers that we use. I don’t know of a reliable way of confirming this theory other than asking your system administrator. You can however see if you can reproduce the issue using “standard” tools like netcat. In the above logs you posted, the port that was assigned by the kernel is 49552. You could set up one instance of netcat listening on that port, and another (from the other machine) connecting to it. If that work, then that’s surprising, and we should look further into what PyTorch is doing differently that prevents it from working.

2 Likes

Thank you for your description. I wil test it with help of our administrator to see how the problem can be sorted.

I got a direct link between two nodes as the admin said.
Now it seems it goes further, But again I have an error on two machines :(:
on Master Node

on worker node:

I updated my pytorch to 1.7 now I get this error on worker node:

As I checked with nc command I can open any port and two nodes can see each other on them.
Now the worker try to connect to a random port on master node that has not opened as I monitored.
:frowning:

I ran same code on single machine, It is the netstate of estabilished connection:

To reduce the number of variables at play here, let’s maybe start by trying to get the ProcessGroup backend (based on Gloo) to work. (I’m saying this because the TensorPipe backend also uses Gloo internally and thus could fail even if the error didn’t come from TensorPipe directly). Could you try with that backend and see what happens?

Also, I didn’t understand what your system admid did. Did they open up some range of ports? Or maybe just port 49552 that we were mentioning before? Or did they remove the firewall entirely? Only the latter option will work, because the former ones will be ineffective since both Gloo and TensorPipe will use different ports each time they run.

What is more, in fact, is that Gloo opens many listening sockets, one for every peer that it needs to communicate with. Hence having a fixed range of ports (or even just a single one) that Gloo is allowed to use won’t scale.

One last thing: the “connection closed by peer” error seems to suggest that the nodes were initially able to connect but that one of them then crashed (although it could also have other explanations). Were you able to see whether the other node had some different error in its log (which may be the real root cause)?

One way you may be able to circumvent all this, and not require the intervention of your sysadmin, is by setting up some sort of VPN between your two nodes so that they can open whatever ports they like but it would still all pass through the firewall properly. I do not really know how to set such a thing up, and it may require you to use Docker if you need to avoid root privileges. In the end it may get complicated.

Hey @lcw Thank you for your reply.

I got a direct link between two servers as our admin said. So I am running only with two servers.
@mrshenli suggested running DDP to check. I tried DDP example and it works perfectly.
But again I got “host unreachable” error for the parameter server example.
I will try your suggestion as well. But since DDP runs correctly, maybe it would be a clue for you. I tried it by gloo and NCCL backend both work.

Just to make sure I got this right: are you saying that DDP with the Gloo backend works correctly, while RPC with the ProcessGroup backend does not work for you?

One thing to pay attention to: the DDP example you linked to only runs on a single machine, without using the network. (It spawns two processes, but they’re both local). So to make sure that DDP indeed works across the network you’ll probably have to change the example a bit and launch the two processes by hand on the two machines.

Yes, DDP runs but RPC has a problem.

No, I just ran it without the spawns function. I ran the example(rank, world_size) function with relative rank on each machine. So this is running on two machines and I see the traffic by monitoring tools.

But for RPC it just faces the that error

Hey @lcw, Sorry, I just used DDP to check the gloo backend. Now I ran thr RPC example with Process_Group and it works.
But that error exist for TensorPipe backend.

Could you tell me once more the exact setup you’re using to run the RPC example under ProcessGroup and under TensorPipe? A code pointer would be great, plus the commands you use to launch, including the environment variables you’re setting. And possibly the logs that each of them produces (with the errors). Thanks!

I am running this code on two machines. The code is same for both, but I run the run(rank,wordsize) function with rank zero and one (rank(0,2)). Each machine has two network interface and eno2 from PS is connected to eno1 in workser. So I change it in worker code to eno1 (einvroment variable).

import os
import threading
from datetime import datetime

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
from torch import optim

import torchvision


batch_size = 20
image_w = 64
image_h = 64
num_classes = 30
batch_update_size = 1
num_batches = 6


def timed_log(text):
    print(f"{datetime.now().strftime('%H:%M:%S')} {text}")


class BatchUpdateParameterServer(object):

    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        self = ps_rref.local_value()
        timed_log(f"PS got {self.curr_update_size}/{batch_update_size} updates")
        for p, g in zip(self.model.parameters(), grads):
            p.grad += g
        with self.lock:
            self.curr_update_size += 1
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                fut.set_result(self.model)
                timed_log("PS updated model")
                self.future_model = torch.futures.Future()

        return fut


class Trainer(object):

    def __init__(self, ps_rref):
        self.ps_rref = ps_rref
        self.loss_fn = nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(num_batches):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        m = self.ps_rref.rpc_sync().get_model().cuda()
        for inputs, labels in self.get_next_batch():
            timed_log(f"{name} processing one batch")
            self.loss_fn(m(inputs), labels).backward()
            timed_log(f"{name} reporting grads")
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()
            timed_log(f"{name} got updated model")


def run_trainer(ps_rref):
    trainer = Trainer(ps_rref)
    trainer.train()


def run_ps(trainers):
    timed_log("Start training")
    ps_rref = rpc.RRef(BatchUpdateParameterServer())
    futs = []
    for trainer in trainers:
        futs.append(
            rpc.rpc_async(trainer, run_trainer, args=(ps_rref,))
        )

    torch.futures.wait_all(futs)
    timed_log("Finish training")


def run(rank, world_size):
    os.environ['MASTER_ADDR'] = '10.27.1.1'
    os.environ['MASTER_PORT'] = '49000'
    os.environ['TF_SOCKET_IFNAME'] = 'eno2'
    os.environ['GLOO_SOCKET_IFNAME'] = 'eno2'
    
    if rank != 0:
        rpc.init_rpc(
            f"trainer{rank}",
            rank=rank,
            world_size=world_size,
            backend=rpc.BackendType.TENSORPIPE,
          
        )
        # trainer passively waiting for ps to kick off training iterations
    else:
        rpc.init_rpc(
            "ps",
            rank=rank,
            world_size=world_size,
            backend=rpc.BackendType.TENSORPIPE,
             )
        run_ps([f"trainer{r}" for r in range(1, world_size)])

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = batch_update_size + 1
    run(0,world_size) # Rank zero for PS and Rank 1 for Worker


I add this option too the errors are same.


options=rpc.TensorPipeRpcBackendOptions(
       num_worker_threads=16,
       rpc_timeout=0,  # infinite timeout
       init_method='tcp://10.27.1.1:49000'
      )

Worker error:

Failed to respond to 'Shutdown Proceed' in time, got error EHOSTUNREACH: host is unreachable
Traceback (most recent call last):
  File "worker1.py", line 156, in <module>
    run(1, world_size)
  File "worker1.py", line 151, in run
    rpc.shutdown()
  File "/home/skh018/envs/distributed_learning/lib/python3.8/site-packages/torch/distributed/rpc/api.py", line 78, in wrapper
    return func(*args, **kwargs)
  File "/home/skh018/envs/distributed_learning/lib/python3.8/site-packages/torch/distributed/rpc/api.py", line 284, in shutdown
    _get_current_rpc_agent().join()
RuntimeError: [/opt/conda/conda-bld/pytorch_1603729062494/work/third_party/gloo/gloo/transport/tcp/pair.cc:575] Connection closed by peer [10.27.1.1]:4482

PS error:

[W tensorpipe_agent.cpp:687] RPC agent for ps encountered error when sending outgoing request #0 to trainer1: ECONNREFUSED: connection refused
Traceback (most recent call last):
  File "/home/skh018/PycharmProjects/distribbuted_learning_pytorch/test_DDL_pytorch/ps_new.py", line 157, in <module>
    run(0,world_size)
  File "/home/skh018/PycharmProjects/distribbuted_learning_pytorch/test_DDL_pytorch/ps_new.py", line 149, in run
    run_ps([f"trainer{r}" for r in range(1, world_size)])
  File "/home/skh018/PycharmProjects/distribbuted_learning_pytorch/test_DDL_pytorch/ps_new.py", line 110, in run_ps
    torch.futures.wait_all(futs)
  File "/home/skh018/envs/distribbuted_learning_pytorch/lib/python3.8/site-packages/torch/futures/__init__.py", line 162, in wait_all
    return [fut.wait() for fut in torch._C._collect_all(cast(List[torch._C.Future], futures)).wait()]
RuntimeError: ECONNREFUSED: connection refused

For runing I just use:

python ps.py
python worker.py

I hope, I covered all details.