Rpc functions.async_execution rpc and futures

Hi Folks,

Can someone please help me understand why it is not working? My understanding was that torch Future is asyncio.

remote worker
class foo
@rpc.functions.async_execution
def foo()
  x = []
  y = []
  // here x and y populated so x and y are Future
return x, y
class Agent()
def bar():
    ft = ob_rref.rpc_async().foo(self.agent_rref, n_steps, meta_tasks)
   self.trainer.step(*ft)

   where step expects asyncio future.

So ft should be a future, but I can’t wait, nor can I pass nor unpack.

class Trainer.
     def step(x,y):
        in my old code, x and y list of futures.
        so downstream code I have await x.  or gather *x

But it looks like torch Future not iterable

Basically, the code that I'm moving to is RPC, but before that, it was Python 3.10 asyncio 

        x  = [self._event_loop.create_future() for _ in X]
        y = [[self._event_loop.create_future() for _ in Y] for _ in range(steps)]

So it lists futures

So I guess how I can convert torch Future to asyncio Future?

torch.Future is not an asyncio future, you can refer to the docs here: torch.futures — PyTorch 1.13 documentation.

You can probably convert a torch.Future to asyncio Future as follows:

torch_fut = rpc.async(...)
asyncio_fut = asyncio.Future()

torch_fut.then(
  lambda torch.fut: asyncio_fut.set_result(torch_fut.result())
)
1 Like

Thank you very much, Let me try , But if it wrapper around torch._C.Future what can you do from asyncio perspective. If downstream code uses asyncio ?

Does torch._C.Future has a similar behavior thread future in Python ?