RPC behavior difference between pytorch 1.7.0 vs 1.9.0

Hello,

I have a use case where I create one process per available gpu along with multiple e.g. 15 processes that only run on the CPU. Here is the minimalistic working example that works in pytorch 1.7.0 but fails in 1.9.0. However, if I only use 3 or less GPUs rather than 4 while keeping the number of CPU processes the same it works on version 1.9.0 too.
Could you please guide me towards why this is happening and how it can be resolved?
Thanks!

Code:

import os
import time
import torch
torch.multiprocessing.set_sharing_strategy('file_system')
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc

no_of_saver_processes = 15
world_size = torch.cuda.device_count()

def cpu_process_initialization(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size = world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(rpc_timeout=0,
                                                                     init_method='env://')
                 )
    print(f"Started CPU process {rank}")
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    # Do something rather than sleeping example disk or cpu bound operations
    time.sleep(30)
    rpc.shutdown()
    return

def cuda_process_initialization(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size=world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(#num_send_recv_threads=args.world_size*3,
                                                                     rpc_timeout=0,
                                                                     init_method='env://')
                 )
    torch.cuda.set_device(rank)
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{rank}"
    print(f"Started CUDA process on gpu {rank}")
    # Do some cuda operations
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    time.sleep(30)
    rpc.shutdown()
    return

if __name__ == "__main__":
    mp.set_start_method('forkserver', force=True)
    trainer_processes = mp.spawn(cuda_process_initialization,
                                 nprocs=world_size,
                                 join=False)
    cpu_processes = []
    for rank in range(world_size,world_size+no_of_saver_processes):
        p = mp.Process(target=cpu_process_initialization,
                       args=(rank,))
        p.start()
        cpu_processes.append(p)
    for p in cpu_processes: p.join()
    trainer_processes.join()

Error:

terminate called after throwing an instance of 'std::runtime_error'
  what():  In handleEventInFromLoop at tensorpipe/transport/shm/connection_impl.cc:235 "errCouldn't access ringbuffer of connection outbox: fstat: Bad file descriptor (this error originated at tensorpipe/common/shm_segment.cc:153)"
[W tensorpipe_agent.cpp:653] RPC agent for 4 encountered error when accepting incoming pipe: async error on socket: Connection reset by peer (this error originated at tensorpipe/transport/shm/connection_impl.cc:187)
[W tensorpipe_agent.cpp:843] RPC agent for 4 encountered error when reading incoming request from 0: async error on socket: Connection reset by peer (this error originated at tensorpipe/transport/shm/connection_impl.cc:187)
[W tensorpipe_agent.cpp:653] RPC agent for 2 encountered error when accepting incoming pipe: async error on socket: Connection reset by peer (this error originated at tensorpipe/transport/shm/connection_impl.cc:187)
[W tensorpipe_agent.cpp:843] RPC agent for 2 encountered error when reading incoming request from 0: async error on socket: Connection reset by peer (this error originated at tensorpipe/transport/shm/connection_impl.cc:187)
[W tensorpipe_agent.cpp:843] RPC agent for 1 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:843] RPC agent for 6 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:843] RPC agent for 8 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:843] RPC agent for 3 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:843] RPC agent for 7 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:843] RPC agent for 5 encountered error when reading incoming request from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:1049] RPC agent for 7 encountered error when reading incoming response from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:1049] RPC agent for 1 encountered error when reading incoming response from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:1049] RPC agent for 8 encountered error when reading incoming response from 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Failed to respond to 'Shutdown Proceed' in time, got error eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Failed to respond to 'Shutdown Proceed' in time, got error eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Failed to respond to 'Shutdown Proceed' in time, got error eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Process Process-9:
Traceback (most recent call last):
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 317, in _tensorpipe_init_backend_handler
    agent.join()
RuntimeError: [/opt/conda/conda-bld/pytorch_1623448255797/work/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [127.0.0.1]:39602: Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/net/home/store/home/user/The/Feature_Distribution/test_mp.py", line 14, in cpu_process_initialization
    rpc.init_rpc(f"{rank}",
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 203, in init_rpc
    _init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 237, in _init_rpc_backend
    rpc_agent = backend_registry.init_backend(
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 99, in init_backend
    return backend.value.init_backend_handler(*args, **kwargs)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 319, in _tensorpipe_init_backend_handler
    api.shutdown()
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 79, in wrapper
    return func(*args, **kwargs)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 313, in shutdown
    _get_current_rpc_agent().join(shutdown=True)
RuntimeError: [/opt/conda/conda-bld/pytorch_1623448255797/work/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [127.0.0.1]:39602: Connection reset by peer
[W tensorpipe_agent.cpp:1025] RPC agent for 6 encountered error when sending outgoing request #1 to 0: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Failed to respond to 'Shutdown Proceed' in time, got error eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
Process Process-8:
Traceback (most recent call last):
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 317, in _tensorpipe_init_backend_handler
    agent.join()
RuntimeError: [/opt/conda/conda-bld/pytorch_1623448255797/work/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [127.0.0.1]:6868: Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/net/home/store/home/user/The/Feature_Distribution/test_mp.py", line 14, in cpu_process_initialization
    rpc.init_rpc(f"{rank}",
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 203, in init_rpc
    _init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 237, in _init_rpc_backend
    rpc_agent = backend_registry.init_backend(
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 99, in init_backend
    return backend.value.init_backend_handler(*args, **kwargs)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 319, in _tensorpipe_init_backend_handler
    api.shutdown()
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 79, in wrapper
    return func(*args, **kwargs)
  File "/home/user/anaconda3/envs/pytorch1.9/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 313, in shutdown
    _get_current_rpc_agent().join(shutdown=True)
RuntimeError: [/opt/conda/conda-bld/pytorch_1623448255797/work/third_party/gloo/gloo/transport/tcp/pair.cc:589] Read error [127.0.0.1]:6868: Connection reset by peer

cc @lcw mrshenli regarding tensor pipe RPC, is this expected?

Thanks! fixing the tag @mrshenli

I confirm that I can reproduce the same behavior 3 CUDA process works but 4 does not work, with 15 CPU processes.

Could you please guide me towards why this is happening and how it can be resolved?

The difference between 1.9 and 1.7 is that we introduced RPC CUDA RMDA in v1.9. Direct Device-to-Device Communication with TensorPipe CUDA RPC ā€” PyTorch Tutorials 2.1.1+cu121 documentation

So the first thing I tried is setting os.environ["CUDA_VISIBLE_DEVICES"] = "" in cpu_process_initialization. But I hit the following error:

RuntimeError: In create at tensorpipe/common/cuda_lib.h:117 "lib.init(0)(100) CUDA_ERROR_NO_DEVICE (no CUDA-capable device is detected)"

@lcw any opinion on whether we should let TensorPipe or TensorPipe_Agent to detect the CUDA device runtime availability? (Created an issue to track: init_rpc fails after setting CUDA_VISIBLE_DEVICES env var to "" Ā· Issue #60578 Ā· pytorch/pytorch Ā· GitHub)

Then I played with the number of processes, it looks like it crashes as long as CUDA process + CPU process > 18. (i.e., 8 CUDA process + 10 CPU process also works). I donā€™t recall there is any hard limit on the number of processes in RPC. @lcw Does TensorPipe has restrictions on that?

Below is the repro I used locally.

import os
import time
import torch
#torch.multiprocessing.set_sharing_strategy('file_system')
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc

no_of_saver_processes = 10
#world_size = torch.cuda.device_count()
world_size = 8

def cpu_process_initialization(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{rank % torch.cuda.device_count()}"
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size = world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(rpc_timeout=6000,
                                                                     init_method='env://')
                 )
    print(f"Started CPU process {rank}")
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    # Do something rather than sleeping example disk or cpu bound operations
    time.sleep(30)
    rpc.shutdown()
    return

def cuda_process_initialization(rank):
    #torch.cuda.set_device(rank)
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{rank}"
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size=world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(#num_send_recv_threads=args.world_size*3,
                                                                     rpc_timeout=6000,
                                                                     init_method='env://')
                 )
    print(f"Started CUDA process on gpu {rank}")
    # Do some cuda operations
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    time.sleep(30)
    rpc.shutdown()
    return

if __name__ == "__main__":
    mp.set_start_method('spawn', force=True)
    trainer_processes = mp.spawn(cuda_process_initialization,
                                 nprocs=world_size,
                                 join=False)
    cpu_processes = []
    for rank in range(world_size,world_size+no_of_saver_processes):
        p = mp.Process(target=cpu_process_initialization,
                       args=(rank,))
        p.start()
        cpu_processes.append(p)
    for p in cpu_processes: p.join()
    trainer_processes.join()

Looks like this is an issue with the SHM transport. I bumped up the UV transport priority to over-shadow SHM, and this problem disappeared.

Hey @akshay-raj-dhamija to unblock you, could you please try adding a _transports=["uv"] kill-switch to rpc_backend_options to disable SHM transport?

Below is the code:

import os
import time
import torch
#torch.multiprocessing.set_sharing_strategy('file_system')
import torch.multiprocessing as mp
import torch.distributed.rpc as rpc

no_of_saver_processes = 15
#world_size = torch.cuda.device_count()
world_size = 4

def cpu_process_initialization(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{rank % torch.cuda.device_count()}"
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size = world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
                     rpc_timeout=6000,
                     init_method='env://',
                     _transports=["uv"],
                 )
    )
    print(f"Started CPU process {rank}")
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    # Do something rather than sleeping example disk or cpu bound operations
    time.sleep(30)
    rpc.shutdown()
    return

def cuda_process_initialization(rank):
    #torch.cuda.set_device(rank)
    os.environ["CUDA_VISIBLE_DEVICES"] = f"{rank}"
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '9867'
    rpc.init_rpc(f"{rank}",
                 rank=rank,
                 world_size=world_size + no_of_saver_processes,
                 backend=rpc.BackendType.TENSORPIPE,
                 rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
                     #num_send_recv_threads=args.world_size*3,
                     rpc_timeout=6000,
                     init_method='env://',
                     _transports=["uv"],
                 )
    )
    print(f"Started CUDA process on gpu {rank}")
    # Do some cuda operations
    print(f"Process {rank}: avaialable device {torch.cuda.current_device()}")
    time.sleep(30)
    rpc.shutdown()
    return

if __name__ == "__main__":
    mp.set_start_method('spawn', force=True)
    trainer_processes = mp.spawn(cuda_process_initialization,
                                 nprocs=world_size,
                                 join=False)
    cpu_processes = []
    for rank in range(world_size,world_size+no_of_saver_processes):
        p = mp.Process(target=cpu_process_initialization,
                       args=(rank,))
        p.start()
        cpu_processes.append(p)
    for p in cpu_processes: p.join()
    trainer_processes.join()

Thanks @mrshenli using _transports=["uv"] solved the problem.
Similar to setting os.environ["CUDA_VISIBLE_DEVICES"] = "" I tried setting the option devices=["cpu"] but as you noted it resulted in the following error:

ValueError: `set_devices` expect a list of CUDA devices, but got device type cpu.
1 Like

Youā€™re likely hitting the limit of open file descriptors in your process. You can find this limit through ulimit -n and you can check how many file descriptors are in use by your process through ls /proc/$PID/fd | wc -l or lsof -p $PID | wc -l (ideally this should be done ā€œjust beforeā€ that error happens). Could you provide us those values so we can check this is indeed the cause?

Thanks @lcw you are right looks like v1.9 uses a lot more file descriptors (fds) compared to v1.7.

While v1.7 was close to 135 fds/process, it was close to 270 fds/process for version 1.9, with the zero rank process having 960 fds. These numbers are for 3 GPU and 5 CPU processes.

Yes, increasing the ulimit fixed the issue for v1.9, which points to another possible improvement
In v1.7 we have a very clear error message for such a scenario, the error message looks like

OSError: [Errno 24] Too many open files: '/home/user/anaconda3/envs/pytorch1.7/lib/python3.8/multiprocessing/queues.py'

Probably such an error message for v1.9 and future versions will be useful :slight_smile:

960 file descriptors is still a couple orders of magnitude lower than what Iā€™ve typically seen: from my experience the default appears for each process to have space for up to 64k file descriptors. Are you running in some constrained environment where this limit is much slower? Whatā€™s the output of ulimit -n that I asked for earlier?

Note that lack of file descriptors can appear at any time, during any operation, in many forms: itā€™s unrealistic to catch it consistently and provide a uniform error message.

It seems the default ulimit -n for both ubuntu 18.04 and 20.04 server edition is 1024 unless explicitly changed, mine was the same.

I am also a bit unsure why the number of file descriptors needs to be so large when compared between v1.7 and v1.9, probably need to read a bit more about the changes made but any insights would be helpful.

TensorPipe isnā€™t very ā€œfrugalā€ when it comes to file descriptors (simply because we believe that the system limits are high enough (or can be raised) for this to not be a problem in the vast majority of cases). Hence each RPC ā€œlinkā€ between two processes uses multiple file descriptors, out of simplicity, and to better leverage the kernelā€™s own capabilities for multiplexing and backpressure. With the advent of CUDA channels in TensorPipe, these channels also need some new file descriptors, hence the consumption increased. It is possible to optimize this usage, though Iā€™m not sure when weā€™ll get to it.

If you have a limit of 1024 on your system then yes indeed 960 is too worringly close to it. Though I assume youā€™re able to increase that limit and thus unblock yourself?

1 Like

BTW @mrshenli the fact that the file descriptor usage is much higher on node 0 is probably due to the fact that the _all_gather function basically constructs a tree with node 0 at the root, hence node 0 always opens N-1 links. I donā€™t think any action is needed now, but perhaps we should think if there are other approaches that donā€™t put all that pressure on a single node.

1 Like

Thanks for the explanation. Yes, I was unblocked by raising the ulimit.

I also have same issues, the rpc program will crash if I set world size 30 or above, is there any solution?

You could try to increase your ulimit by using ulimit -n XYZ where XYZ is a very high number. This change may be temporary and you might need to do that with every session or you could do this programmatically in your python script.

If the reason you need RPC processes is to simply share data between different CPU processes rather than sharing across on different GPUā€™s, you might consider simply downgrading your pytorch version to 1.7. Note: There might be a better solution out there that I am unaware of.

Thanks for your reply! But it still doesnā€™t work even if I set the related parameters.

This is my code:

import os

import time

from entity.Server import Server

import torch.multiprocessing as mp

from torch.distributed import rpc

from utils.options import args_parser

import torch.distributed as dist

SERVER_NAME = "Server"

CLIENT_NAME = "Client{}"

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '9090'

def run(rank, args):  

    if rank == 0:

        rpc.init_rpc(SERVER_NAME, rank=rank, world_size=args.world_size)

        # server = Server(args)

    else:

        rpc.init_rpc(CLIENT_NAME.format(rank), rank=rank, world_size=args.world_size)

       

    rpc.shutdown()

   

if __name__ == "__main__":

   

    os.system("ulimit -n 1000000")  

    os.system("ulimit -d unlimited")  

    os.system("ulimit -m unlimited")  

    os.system("ulimit -s unlimited")  

    os.system("ulimit -t unlimited")  

    os.system("ulimit -v unlimited")  

    args = args_parser()

    # run(args.rank, args)

    mp.spawn(

        run,

        args=(args, ),

        nprocs=args.world_size,

        join=True

    )

And error is:

-- Process 23 terminated with the following error:

Traceback (most recent call last):

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap

    fn(i, *args)

File "/home/niclab/zyq/final/cfl/main.py", line 26, in run

    rpc.init_rpc(CLIENT_NAME.format(rank), rank=rank, world_size=args.world_size)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 190, in init_rpc

    _init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/__init__.py", line 224, in _init_rpc_backend

    rpc_agent = backend_registry.init_backend(

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 97, in init_backend

    return backend.value.init_backend_handler(*args, **kwargs)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/backend_registry.py", line 305, in _tensorpipe_init_backend_handler

    api._all_gather(None, timeout=rpc_backend_options.rpc_timeout)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper

    return func(*args, **kwargs)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 204, in _all_gather

    rpc_sync(

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 77, in wrapper

    return func(*args, **kwargs)

File "/root/anaconda3/envs/final/lib/python3.9/site-packages/torch/distributed/rpc/api.py", line 767, in rpc_sync

    return fut.wait()

RuntimeError: connect: Resource temporarily unavailable (this error originated at tensorpipe/common/socket.cc:114)

And I try to degrade the pytorch version to 1.7.0, it can print the rank of process. But when I try to create RRefs, it will be error ECONNREFUSED: connection refused

This is my server code:

def initialize_client_modules(self, args):

    self.logger.info(f"Initialize {self.num_clients} Clients")

    for index in range(self.num_clients):

        self.client_rrefs.append(

            rpc.remote(f"Client{index+1}", Client,

                    args=(index+1, deepcopy(self.model), args))

        )

    self.logger.info("----------------------------------")

btw, the almost same problem appears in pytorch 2.1.2+cu121. In my task, I just apply the cpu-only training with rpc, act as the ps<ā€“>workers style.
the error log is in the following,

rpc transport error:

Process Process-1:
Traceback (most recent call last):
  File "/home/ross/anaconda3/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/ross/anaconda3/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ross/files/gits/torchnet_test/rpc_process/main.py", line 69, in run_pipe
    fut.wait()
RuntimeError: RPCErr:1:RPC ran for more than set timeout (60000 ms) and will now be marked with an error
[E thread_pool.cpp:112] Exception in thread pool task: unknown
FATAL: exception not rethrown
[W tensorpipe_agent.cpp:725] RPC agent for train_0 encountered error when reading incoming request from ps: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:939] RPC agent for train_0 encountered error when reading incoming response from ps: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
[W tensorpipe_agent.cpp:915] RPC agent for train_0 encountered error when sending outgoing request #8599 to ps: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)
On WorkerInfo(id=0, name=train_0):
RuntimeError('eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)')
Traceback (most recent call last):
  File "/home/ross/anaconda3/lib/python3.11/site-packages/torch/distributed/rpc/internal.py", line 207, in _run_function
    result = python_udf.func(*python_udf.args, **python_udf.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/files/gits/torchnet_test/rpc_process/main.py", line 36, in run_trainer
    output = remote_model(sample).view(-1)
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/anaconda3/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1518, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/anaconda3/lib/python3.11/site-packages/torch/nn/modules/module.py", line 1527, in _call_impl
    return forward_call(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/files/gits/torchnet_test/rpc_process/rpc_model.py", line 87, in forward
    out_ = _remote_method(MLP.forward, self.mlp_rref, emb_)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/files/gits/torchnet_test/rpc_process/rpc_model.py", line 14, in _remote_method
    return trpc.rpc_sync(rref.owner(), _call_method, args=[method, rref]+list(args), kwargs=kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/anaconda3/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 82, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/ross/anaconda3/lib/python3.11/site-packages/torch/distributed/rpc/api.py", line 813, in rpc_sync
    return fut.wait()
           ^^^^^^^^^^
RuntimeError: eof (this error originated at tensorpipe/transport/shm/connection_impl.cc:259)

when I change the rpc_backend_options _transports to ā€˜uvā€™, the problem solved, :grinning: