Hi @lcw. .add_done_callback()
was exactly what I was looking for. Using it instead of .then()
resolves my bottleneck issue completely.
I was able to successfully integrate Pytorch’s RPC with asyncio in a way that I think works well.
Basically, I have my long-running Pytorch RPCs launch their own event loop (there is one RPC per core on my machine, each to a different worker process). Then, I just have a thin non-async function to wrap the async function. The non-async function is the one that Pytorch RPC directly calls.
async def async_my_rpc_function():
# real logic goes here...
def my_rpc_function():
asyncio.get_event_loop().run_until_complete(async_rpc_function())
Once I am inside “async-land”, I can create new async tasks, wait for new async tasks, etc. To make sure things are simple, 1. I only have one active rpc to this top level function at a time, meaning each process only has one thread, and one asyncio event loop. 2. I make sure to not create any background tasks that will outlive the RPC, so that when the RPC returns, the event loop is empty, and it is ok that it will be dead until the next RPC is called.
The only other piece necessary to get concurrency working properly is a way to await
a Pytorch RPC from async land, so making RPCs doesn’t block the event queue. To do this, I use the following snippet of code (note how I am using .add_done_callback()
now )
loop = asyncio.get_event_loop()
f = loop.create_future()
self.inference_queue.rpc_async().my_rpc_function().add_done_callback(
lambda fut:
loop.call_soon_threadsafe(
f.set_result, fut.value()
)
)
result = await f
This allows the asyncio event loop to be safely notified by the pytorch RPC thread when the RPC is complete.
I think that the “nice” think to do would be to implement __await__
directly on the pytorch Future object, with the implementation doing something like the snippet above. Although, I am not an expert on this, and relatively new to asyncio, so take my suggestion with a grain of salt
asyncio itself has some support for multiprocessing worker pools executing tasks, (so something like an internal RPC), but I basically ignored that. It’s possible that a better integration would be possible by understanding how that part of asyncio works (concurrent.futures — Launching parallel tasks — Python 3.9.4 documentation).