Pytorch Distributed RPC bottleneck in _recursive_compile_class

I am attempting to use Pytorch Distributed RPC with a large number of requests in flight, using multiprocessing on a single machine. I am using the latest nightly Pytorch.

I am finding that there is a bottleneck inside a weird place in the pytorch code. The vast majority of time is spent in _recursive_compile_class. This seems wrong to me, because I shouldn’t be compiling code for every RPC (every RPC is for the same function).

Here is the most common stack trace:

__pthread_cond_timedwait (/lib/x86_64-linux-gnu/libpthread-2.31.so:0)

> exists (/usr/lib/python3.9/genericpath.py:19)

> getsourcefile (/usr/lib/python3.9/inspect.py:706)

> findsource (/usr/lib/python3.9/inspect.py:817)

> getsourcelines (/usr/lib/python3.9/inspect.py:1006)

> getsource (/usr/lib/python3.9/inspect.py:1024)

> get_type_hint_captures (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/_jit_internal.py:321)

> createResolutionCallbackForClassMethods (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/_jit_internal.py:376)

> _recursive_compile_class (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/jit/_script.py:1164)

> pybind11::detail::simple_collector<(pybind11::return_value_policy)1>::call (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> torch::jit::tryToInferType (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> c10::ivalue::ConcretePyObjectHolder::tryToInferType (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> c10::IValue::getSubValues (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_cpu.so:0)

> at::cuda::CUDAFuture::extractDataPtrs (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> at::cuda::CUDAFuture::preMarkCompletedHook (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> c10::ivalue::Future::markCompleted (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), c10::ivalue::Future::then(std::function<c10::IValue()>, std::shared_ptr<c10::Type>)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), at::cuda::CUDAFuture::wrapCallback(std::function<void ()>)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> c10::ivalue::Future::markCompletedWithDataPtrs (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), torch::distributed::rpc::toPyJitFuture(std::shared_ptr<c10::ivalue::Future> const&, bool)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), std::function<void ()> at::wrapPropagateTLSState<void>(std::function<void ()>)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), at::cuda::CUDAFuture::wrapCallback(std::function<void ()>)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> std::_Function_handler<void (), torch::distributed::rpc::TensorPipeAgent::markFutureAsComplete(std::shared_ptr<torch::distributed::rpc::TensorPipeAgent::AtomicJitFuture>, torch::distributed::rpc::Message, std::shared_ptr<torch::distributed::rpc::LazyStreamContext>)::{lambda()#1}>::_M_invoke (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libtorch_python.so:0)

> c10::ThreadPool::main_loop (/home/jeremy/PycharmProjects/hearthstone_battlegrounds/venv/lib/python3.9/site-packages/torch/lib/libc10.so:0)

> 0x7f8b6e3eeed0 (/usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.28:0)

My full test case is available at stone_ground_hearth_battles/test_pytorch_distributed.py at 15534b50902c52d0be39700f783d18655083a794 · JDBumgardner/stone_ground_hearth_battles · GitHub

_recursive_compile_class is specific to jit so would be help to include add the “jit” category in the question. My intuition as to why it must recompile everytime is that RPC calls allow for user defined logic with conditionals/branching so it would not be possible to compile only once, but I am not completely sure of this.

cc: @wanchaol

@H-Huang Thanks for the response. I should also be clear that I don’t see any reason why my code would have to be JIT’d for a RPC call. My code does not use JIT at all. So the JIT that is occurring must be an internal implementation detail of the Pytorch RPC API. I don’t find it that weird that something might be JIT’d once, but it seems really weird to me that internally it would be JITing for every RPC.

I am also unable to figure out how to add the “jit” tag to this post.

My apologies, this is part of the internal implementation for RPC, looking now.

@jeremysalwen Thanks for reporting this. Could you create a github issue for this on GitHub - pytorch/pytorch: Tensors and Dynamic neural networks in Python with strong GPU acceleration with a minimal repro?

cc @lcw Since it does seem related to the CudaFuture type inference.

This happens because when using RPC with CUDA tensors we need to inspect the values held by the Future objects in order to extract the tensors they contain (because we need to “track” those tensors across CUDA streams). Since we inspect those values from C++, we need to first convert Python objects to JIT objects in order to be able to handle them. This apparently is more expensive than we thought. Note that this only happens for CUDA tensors, and it only happens for callbacks (i.e., when you use the then method) because in those cases the value is a user-provided one and thus we need to re-inspect it every time as we cannot assume anything about it.

We were discussing possible changes to how CUDA Futures work, and this consideration will factor in, but I don’t yet know what the final outcome will be.

Thanks @lcw, my one question is whether there is a way around using the .then() method. In my case, I am using it to convert a Pytorch Future into an asyncio future. Is there an alternative way to do this without using .then (and without having to JIT the objects?).

@jeremysalwen I’m not exactly what you need, but you could consider using the add_done_callback method (see here). It behaves almost like then, except that it doesn’t return anything (and thus the callback passed to it is also supposed to not return anything).

As a side note, I’d be curious to hear how you’re integrating PyTorch’s RPC with asyncio: we’ve been toying with some ideas to make them more compatible but weren’t sure if there was actually need/demand for it.

1 Like

Hi @lcw. .add_done_callback() was exactly what I was looking for. Using it instead of .then() resolves my bottleneck issue completely.

I was able to successfully integrate Pytorch’s RPC with asyncio in a way that I think works well.

Basically, I have my long-running Pytorch RPCs launch their own event loop (there is one RPC per core on my machine, each to a different worker process). Then, I just have a thin non-async function to wrap the async function. The non-async function is the one that Pytorch RPC directly calls.

async def async_my_rpc_function():
   # real logic goes here...

def my_rpc_function():
 asyncio.get_event_loop().run_until_complete(async_rpc_function())

Once I am inside “async-land”, I can create new async tasks, wait for new async tasks, etc. To make sure things are simple, 1. I only have one active rpc to this top level function at a time, meaning each process only has one thread, and one asyncio event loop. 2. I make sure to not create any background tasks that will outlive the RPC, so that when the RPC returns, the event loop is empty, and it is ok that it will be dead until the next RPC is called.

The only other piece necessary to get concurrency working properly is a way to await a Pytorch RPC from async land, so making RPCs doesn’t block the event queue. To do this, I use the following snippet of code (note how I am using .add_done_callback() now :slight_smile: )

        loop = asyncio.get_event_loop()
        f = loop.create_future()
        self.inference_queue.rpc_async().my_rpc_function().add_done_callback(
            lambda fut:
            loop.call_soon_threadsafe(
                f.set_result, fut.value()
            )
        )
        result = await f

This allows the asyncio event loop to be safely notified by the pytorch RPC thread when the RPC is complete.

I think that the “nice” think to do would be to implement __await__ directly on the pytorch Future object, with the implementation doing something like the snippet above. Although, I am not an expert on this, and relatively new to asyncio, so take my suggestion with a grain of salt :wink:

asyncio itself has some support for multiprocessing worker pools executing tasks, (so something like an internal RPC), but I basically ignored that. It’s possible that a better integration would be possible by understanding how that part of asyncio works (concurrent.futures — Launching parallel tasks — Python 3.9.4 documentation).

Great, thanks for the detailed explanation, it’s really helpful!

You definitely seem to know what you’re doing! Here are a few comments in case you haven’t thought about it already:

  • The way you “bind” a PyTorch future to an asyncio future, through add_done_callback, works well for successful RPCs but it might not work in case of errors, as calling fut.value() would raise an exception, and this f.set_result would never be called. In those cases your asyncio future would never become complete and any piece of code that’s awaiting it might remain stuck forever, leading to a deadlock.
  • By running asyncio.get_event_loop().run_until_complete(...) (which BTW I think can be shortened to just asyncio.run(...) starting in Python 3.7) you block the calling thread until all asyncio tasks launched within that loop are complete. The thread, in this case, comes from RPC’s internal pool, which has only a limited number of threads. This means that you can only have so many active RPC calls at the same time. This may be exactly what you want, but another way of doing this could be for you to manually start a dedicated single thread for the asyncio loop, and have all incoming RPC calls schedule a task to that thread. If you use the async_execution decorator you can also create a RPC future, have your asyncio task complete that future at a later point in time, and have your RPC function return immediately. This way you could release the thread to the RPC pool while your asyncio thread continues processing the task, meaning the RPC thread could be reused for another request. At the end, it means your asyncio thread would become able to handle an arbitrarily large number of requests at the same time.