Can't init rpc on more than one machine

When I use RPC on more than one machine, the code will get stuck in init_rpc. I try opt.num_servers == 1, it works when opt.num_envs_each_server<30. I use python 3.7.10 and pytorch 1.8.1. Is there something wrong with this code?

# server
def main():
    opt = Options().parse()
    if opt.num_servers == 1:
        num = NUM_TRAINER_PROCESSES + opt.num_envs_each_server
    else:
        num = NUM_TRAINER_PROCESSES
    mp.spawn(run_worker, args=(opt,), nprocs=num, join=True)


def run_worker(idx, opt):
    os.environ["MASTER_ADDR"] = opt.address
    os.environ["MASTER_PORT"] = opt.port
    backend = rpc.BackendType.TENSORPIPE
    rpc_backend_options = rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=1, rpc_timeout=60
    )

    if idx == 0:
        s = socket.socket()
        s.bind((opt.address, opt.env_port))
        s.listen(opt.num_envs)
        for i in tqdm.trange(opt.num_envs, ascii=True):
            c, addr = s.accept()
            c.send(str(i).encode("utf-8"))
        time.sleep(1)
        s.close()

        logger = utils.get_logger("agent")
        logger.info("init rpc for ppo agent")
        rpc.init_rpc(
            "ppo_agent",
            rank=0,
            world_size=opt.world_size,
            backend=backend,
            rpc_backend_options=rpc_backend_options,
        )
    else:
        while 1:
            try:
                s = socket.socket()
                s.connect((opt.address, opt.env_port))
                rank = s.recv(1024)
                rank = int(rank.decode())
                s.close()
                break
            except:
                time.sleep(1)
                pass
        logger = utils.get_logger("env_{}".format(rank))
        logger.info("init rpc for env {}".format(rank))
        rpc.init_rpc(
            "env_{}".format(rank),
            rank=NUM_TRAINER_PROCESSES + rank,
            world_size=opt.world_size,
            backend=backend,
            rpc_backend_options=rpc_backend_options,
        )
        logger.info("env {} is waiting".format(rank))

    rpc.shutdown()

# client
def main():
    opt = Options().parse()
    mp.spawn(run_worker, args=(opt,), nprocs=opt.num_envs_each_server, join=True)


def run_worker(idx, opt):
    os.environ["MASTER_ADDR"] = opt.address
    os.environ["MASTER_PORT"] = opt.port
    backend = rpc.BackendType.TENSORPIPE
    rpc_backend_options = rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=1, rpc_timeout=60
    )
    logger = utils.get_logger("{}".format(idx))

    while 1:
        try:
            s = socket.socket()
            s.connect((opt.address, opt.env_port))
            rank = s.recv(1024)
            rank = int(rank.decode())
            s.close()
            break
        except Exception as e:
            logger.info(e)
            time.sleep(1)
            pass

    logger.info("init rpc for env {}".format(rank))
    rpc.init_rpc(
        "env_{}".format(rank),
        rank=NUM_TRAINER_PROCESSES + rank,
        world_size=opt.world_size,
        backend=backend,
        rpc_backend_options=rpc_backend_options,
    )
    logger.info("env {} is waiting".format(rank))

    rpc.shutdown()

Hey @yueyilia, would I be correct if I assume the world_size is NUM_TRAINER_PROCESSES + 2 * num_envs_each_server? Have you trired increasing the value of num_worker_threads to, say, 128?

BTW, when you replace init_rpc with init_process_group and use the same rank and world_size, port, and, address, does it work?

Hey, thanks for your reply.
Specifically, in my simplified test example, opt.world_size=33, opt.num_servers = 2, opt.num_envs_each_server=16, NUM_TRAINER_PROCESSES=1.
It does not work when modifying the value of num_worker_threads.
When using init_process_group, clients are successfully initialized, but the server still gets stuck in init_process_group.

Hmm, this means some configuration might be wrong (init_process_group has been widely used in multi-machine training). The behavior suggests the client thinks all peers are joined, but the server is still waiting for some peers.

Curious, why do you need to use your own sockets to communicate ranks? Could you please print all ranks and see if those are expected?

If that still doesn’t give a clue, could you please share a min repro?

I plan to use 100 servers to build thousands of environments. I don’t know how to assign a unique rank without sockets. Is there any usage that I don’t know?
In fact, I once ran the same code on pytorch1.6. There was no problem with the startup. But I could only run up to 1000 environments because of the port limit (can see When I use 1024 nodes in rpc, I meet RuntimeError "listen: Address already in use" - #6 by yueyilia). I don’t know why the error occurred now.
I still haven’t found a solution, the following is a min repro.

import os
import time
import tqdm
import socket
import argparse
import logging
import torch
import torch.multiprocessing as mp
import torch.distributed
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser()
parser.add_argument("--mode", type=str, default="trainer")
parser.add_argument("--address", type=str, default="10.90.224.127", help="")
parser.add_argument("--port", type=str, default="10088", help="")
parser.add_argument("--rank_port", type=int, default="10099", help="")
parser.add_argument("--num_envs_each_server", type=int, default=16, help="")
parser.add_argument("--num_servers", type=int, default=2, help="")
parser.add_argument("--num_envs", type=int, default=32, help="")
parser.add_argument("--world_size", type=int, default=33, help="")
opt = parser.parse_args()


def main():
    if opt.mode == "trainer":
        mp.spawn(run_worker, args=(opt,), nprocs=1, join=True)
    else:
        mp.spawn(run_worker, args=(opt,), nprocs=opt.num_envs_each_server, join=True)


def run_worker(idx, opt):
    os.environ["MASTER_ADDR"] = opt.address
    os.environ["MASTER_PORT"] = opt.port
    backend = rpc.BackendType.TENSORPIPE
    rpc_backend_options = rpc.TensorPipeRpcBackendOptions(
        num_worker_threads=1, rpc_timeout=60
    )

    if opt.mode == "trainer":
        s = socket.socket()
        s.bind((opt.address, opt.rank_port))
        s.listen(opt.num_envs)
        for i in tqdm.trange(opt.num_envs, ascii=True):
            c, addr = s.accept()
            c.send(str(i).encode("utf-8"))
        time.sleep(1)
        s.close()

        logger = get_logger("agent")
        logger.info("init rpc for ppo agent")
        rpc.init_rpc(
            "ppo_agent",
            rank=0,
            world_size=opt.world_size,
            backend=backend,
            rpc_backend_options=rpc_backend_options,
        )
        # torch.distributed.init_process_group(
        #     backend="gloo", rank=0, world_size=opt.world_size,
        # )
        logger.info("end")
    else:
        while 1:
            try:
                s = socket.socket()
                s.connect((opt.address, opt.rank_port))
                rank = s.recv(1024)
                rank = int(rank.decode())
                s.close()
                break
            except:
                time.sleep(1)
                pass
        logger = get_logger("env_{}".format(rank))
        logger.info("init rpc for env {}".format(rank))
        rpc.init_rpc(
            "env_{}".format(rank),
            rank=1 + rank,
            world_size=opt.world_size,
            backend=backend,
            rpc_backend_options=rpc_backend_options,
        )
        # torch.distributed.init_process_group(
        #     backend="gloo", rank=1 + rank, world_size=opt.world_size,
        # )
        logger.info("env {} is waiting".format(rank))

    rpc.shutdown()


def get_logger(name="", level=logging.INFO, stream=True, file=None):
    try:
        import absl.logging

        logging.root.removeHandler(absl.logging._absl_handler)
        absl.logging._warn_preinit_stderr = False
    except Exception as e:
        print("failed to fix absl logging bug", e)
        pass

    logger = logging.getLogger(name)
    logger.setLevel(level)

    if stream:
        stream_handler = logging.StreamHandler()
        stream_formatter = logging.Formatter("%(asctime)s - %(message)s")
        stream_handler.setFormatter(stream_formatter)
        logger.addHandler(stream_handler)

    if file:
        path = os.path.join(file, name + ".log")
        file_handler = logging.handlers.RotatingFileHandler(
            path, "a", 100 * 1024 * 1024, 1, encoding="utf-8"
        )
        file_formatter = logging.Formatter(
            "%(asctime)s %(levelname)s [%(filename)s: %(lineno)d] [%(processName)s: %(process)d] - %(message)s"
        )
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)

    return logger


if __name__ == "__main__":
    main()

pulling in store rendezvous expert @H-Huang :slight_smile:

Hi @yueyilia, I tried your code locally (python 3.8.5 and nightly build of pytorch which is 1.10 now), it worked for me and is able to get past init_rpc. I think the issue may be with how your cluster of machines is configured where they cannot establish connections with each other, but I’m not exactly sure.

You can modify your script to use torchleastic via torch.distributed.run (Elastic Launch) — PyTorch master documentation. This essentially performs the work you are doing by having all nodes rendezvous then assign ranks to each process, with added fault tolerance and elasticity if you choose to enable it. You need to modify your script so that it acts as a single process, then run the command:

python -m torch.distributed.run
    --nnodes=$NUM_NODES
    --nproc_per_node=$NUM_TRAINERS
    --rdzv_id=$JOB_ID
    --rdzv_backend=c10d
    --rdzv_endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

The rank is automatically assigned with torchelastic. Each process will be able to read it’s own rank as an environment variable (e.g. rank = os.environ["RANK"]). Here are the list of the environment variables that get be passed into your script: torch.distributed.run (Elastic Launch) — PyTorch master documentation