PyTorch distributed example code hang. Deadlock?

Hi,

I am trying to run the example code from the pytorch distributed tutorial (dist_tuto.html).

Here is my exact code:

import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process


def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        dist.send(tensor=tensor, dst=1)
    else:
        dist.recv(tensor=tensor, src=0)
    print("Rank ", rank, " has data ", tensor[0])


def init_process(rank, size, fn, backend="gloo"):
    """ Initialize the distributed environment. """
    os.environ["MASTER_ADDR"] = "127.0.0.0"
    os.environ["MASTER_PORT"] = "29511"
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

In terminal, I run it with python run.py. It runs successfully both on my server and on my friend’s Mac. But, it hangs there forever on my own Mac. I tried to restart my Mac, and reinstall python(v3.8.2)/torch(v1.5.0). No success.

If we interrupt it with “control-c”, the last line of trace is:

  File "/usr/local/anaconda3/envs/p382/lib/python3.8/subprocess.py", line 1762, in _try_wait
    (pid, sts) = os.waitpid(self.pid, wait_flags)

I suspect there is a deadlook. Any approach to avoid it?

1 Like

Hey @Hao_Yuan

A few things could cause this issue.

  1. 127.0.0.0 is not a valid IP in some envs. Could you please check that. If it is indeed invalid, can you try 127.0.0.1 or localhost or other valid IP addresses?
  2. Some other process is occupying that port. Can you try a different port number?

Hi @mrshenli.

Somehow, my Mac is better today. I got a warning and the results.

Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (operator() at ../torch/lib/c10d/ProcessGroupGloo.cpp:496)
Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (operator() at ../torch/lib/c10d/ProcessGroupGloo.cpp:496)
Rank  1  has data  tensor(1.)
Rank  0  has data  tensor(1.)

I think the issue is that it takes long time to “resolve hostname”. After export GLOO_SOCKET_IFNAME=en0. I got the result quickly.

1 Like

Curious, how is the hostname configured in your env? Can you try:

getent hosts `hostname`

No getnet command in my mac. I googled a similar command.

$ dscacheutil -q host -a name 'hostname'

Not show any result.

1 Like

I see. The hostname wasn’t configured, so that it was unable to resolve when GLOO_SOCKET_IFNAME wasn’t present.

@mrshenli @Hao_Yuan but how did you configure it? I also the same error and neither 127.0.0.1 nor localhost work for me…

@ptrblck sorry for calling you out like this…but I was wondering, is there an explanation of what the error:

[W ProcessGroupGloo.cpp:558] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

means?


Just in case you need it here is a self contained code causing the error:

import os
from typing import Tuple, List

import torch
from torch import nn, optim, Tensor
from torch.distributed import rpc
import torch.distributed.autograd as dist_autograd
from torch.distributed.optim import DistributedOptimizer
from torch.multiprocessing import Pool, Process
import torch.multiprocessing as mp

world_size = 2  # three chunks, one for each process
num_epochs = 1  # this doesn't really matter, we only need to test if it can process a big batch and a small batch
iters = 5  # iters for 1 epoch
iters = 5
Din, Dout = 10, 5

class ToyModel(nn.Module):

    def __init__(self):
        super().__init__()
        self.lin = nn.Linear(Din, Dout)
        self.criterion = nn.MSELoss()

    def forward(self, batch_x, batch_y):
        y_ = self.lin(batch_x)
        loss = self.criterion(batch_y, y_)
        return loss

def get_ast_batch(batch_size: int) -> List[Tuple[Tensor]]:
    """
    Returns a list of size batch_size with each individual example.
    - 1 example in a batch is a task with K examples with dim=D.

    Note:
        num_proc = 3
        batch_size = 8  # chunk_size = 2 rem 1 (have three chunks of size 2 and one of size 1) 8/3 = 2.666 =  2 rem 1
        batch_size = 2  # chunk_size = 1 for each process since 2/3 <= 1

    :return:
    """
    data_x, data_y = torch.randn(Din), torch.randn(Dout)
    batch = [(data_x, data_y) for _ in range(batch_size)]
    return batch

def get_meta_batch(batch_size: int, k=15) -> Tuple[Tensor]:
    """
    Returns Tuple(torch.Tensor([B, K, D])) where each element in the batch is a task.
    - 1 example in a batch is a task with K examples with dim=D.
    :return:
    """
    data_x, data_y = torch.randn(batch_size, k, Din), torch.randn(batch_size, k, Dout)
    batch = data_x, data_y
    return batch

class Worker:
    def __init__(self, args, kwargs):
        self.args = args
        self.kwargs = kwargs
        self.id = rpc.get_worker_info().id
        # self.env.seed(args.seed)

class Master:

    def __init__(self, world_size):
        self.world_size = world_size
        self.master_rank = 0
        self.num_workers = world_size - 1
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'

        rpc.init_rpc(f'worker{self.master_rank}', rank=self.master_rank, world_size=world_size)
        self.model = ToyModel()
        self.optimizer = optim.Adam(self.model.parameters(), lr=1e-2)
        self.saved_losses = {}
        # create rrefs (remote referneces) for calling rpc calls
        self.workers = []
        for worker_rank in range(1, world_size):
            worker_info = rpc.get_worker_info(worker_name=f'worker{worker_rank}')
            # Make a remote call to run func on worker to and return an RRef to the result value immediately.
            # worker_rrf = rpc.remote(to=worker_info, func=Worker, args='args', kwargs='kwargs')
            self.workers.append(worker_info)
            #
            self.saved_losses[worker_info.id] = []

    def forward_parallel(self, batch):
        """
        :param batch:
            - List[Tensor([B, K, D])]
            - List[Tuple[Tensor(D), Tensor(D)]]
        """
        # batch_size = len(batch)
        batch_size = batch.size(0)  # num_tasks
        Sx, Sy = batch
        chunk_size = batch_size // self.num_workers  # the number of examples to give each worker/proc
        futures = []
        if chunk_size <= 0:
            # give each worker a data point and thats it
            for t in range(batch_size):
                worker_info = self.workers[t]
                # makes non blocking rpc call on worker to and immediately returns a future object to wait the result
                future = rpc.rpc_async(to=worker_info, func=self.model.forward, args=(Sx[t], Sy[t]))
                futures.append(future)
        else:
            # each worker receives a chunk of size chunk_size
            chunk_idx = 0
            for worker_idx in range(self.num_workers):
                chunk_x = Sx[chunk_idx:chunk_idx+chunk_size]
                chunk_y = Sy[chunk_idx:chunk_idx+chunk_size]
                chunk_idx += chunk_size
                worker_info = self.workers[worker_idx]
                future = rpc.rpc_async(to=worker_info, func=self.model.forward, args=(chunk_x, chunk_y))
                futures.append(future)
        loss = 0
        for future in futures:
            loss = future.wait()
            loss += (1.0/batch_size)*loss
        return loss

    def finish(self):
        def end_master():
            rpc.shutdown()
        master_proc = Process(target=end_master, args=())
        master_proc.start()

def run_worker_process(rank, world_size):
    print('-- Worker run -- ')
    print(f'current process: {mp.current_process()}')
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    # other ranks are the workers (1 to world_size)
    rpc.init_rpc(f'worker{rank}', rank=rank, world_size=world_size)
    # block until all rpcs finish, and shutdown the RPC instance
    print('running shutdown')
    rpc.shutdown()
    print(f'shutdown complete rank {rank}')
    print(f'shutdown complete for process: {mp.current_process()}')

def master(world_size):
    print('-- Master run -- ')
    print(f'current process: {mp.current_process()}')
    # -- init master process --

    master = Master(world_size=world_size)

    # -- do training loop --
    optimizer = optim.Adam(params=master.model.parameters(), lr=0.1)
    # optimizer = DistributedOptimizer(optim.Adam, )
    for epoch in range(num_epochs):
        for batch_idx in range(iters):
            batch = get_ast_batch(batch_size=batch_idx)
            with dist_autograd.context() as context_id:
                loss = master.forward_parallel(batch)
                dist_autograd.backward(context_id, loss)
            optimizer.step()
            optimizer.zero_grad()

    # block until all rpcs finish, and shutdown the RPC instance, once master gets here everything is shut down
    # rpc.shutdown()
    master.finish()


if __name__ == '__main__':
    print('starting __main__')
    for rank in range(1, world_size):
        worker_proc = Process(target=run_worker_process, args=(rank, world_size))
        print(f'creating process object serially: pro_obj is = {worker_proc}')
        # worker_proc.start()
    master(world_size=2)
    print('Done!\a\n')

Solved it by making sure each of my rpc initialized things have a different port number. I guess initializing an rpc vs a distributed group is different…?


related: python - How does one set the pytorch distributed hostname, port and GLOO_SOCKET_IFNAME so that DDP works? - Stack Overflow

I’m not familiar with GLOO, but this post might be helpful.

Hey @Brando_Miranda, you can configure which network interface to use by setting the GLOO_SOCKET_IFNAME env var.

sorry for calling you out like this…but I was wondering, is there an explanation of what the error:

It means Gloo tries to solve the returned value of hostname but failed, and it recommends you to set the GLOO_SOCKET_IFNAME env var to explicitly configure it.

Solved it by making sure each of my rpc initialized things have a different port number.

Hmm, you mean master and worker are using different port number? I would assume this won’t work as they use that master port and ip to conduct rendezvous and find each other. So if they set configured with different port number, they won’t be able to find each other.

I guess initializing an rpc vs a distributed group is different…?

RPC currently uses distributed group (ProcessGroup) internally, so the same initialization configuration should work for both.

Hi Thanks for your replies!

I keep getting the error:

[W ProcessGroupGloo.cpp:558] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

I am unsure why.

My setting is this:

        # set up the master's ip address so this child process can coordinate
        # os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_ADDR'] = 'localhost'
        os.environ['MASTER_PORT'] = '12355'

        # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
        if torch.cuda.is_available():
            backend = 'nccl'
        # Initializes the default distributed process group, and this will also initialize the distributed package.
        dist.init_process_group(backend, rank=rank, world_size=world_size)

does this look wrong to you? Is what you mainly suggest to change the port number?

The recommendation is to check what network interface your machine uses by running something like ifconfig. Then set the environment variable GLOO_SOCKET_IFNAME=eth0 (if eth0 is the right network interface for your host).

what does eth0 mean?

Thanks for the reply!

eth0 is just an example, but it is a very common name used for network interfaces. You need to lookup the right network interface name for your machine and replace it with eth0 in my example above.