Hi Folks,
I have a bizarre issue and can’t narrow down a problem.
Agent <— > Observer.
(Observer takes the tensors and sends via async RPC)
Observer:
# agent will issue this call, and foobar_rpc put tensor from Cuda to CPU and send.
foobar_rpc():
tensor.clone().cpu()
The agent takes the future and puts asyncio queue.
Now in asyncio dequeue, does
Agent
async def the_rpc_call()
fu = rpc_async().foobar() # args etc. removed to make it clear
await THE_QUEUE.put(the_fu)
async def foo()
# skip code for a loop
the_future = THE_QUEUE.get() <- asyncio queue
the_future.wait() # wait for torch future
Val = future.value() # read value
x, y = Val ( let assume and x and y two named tuple)
i.e.,
x.rewards <-- is tensor
x.observation. <-- is tensor
x.someint <-- native python type
# If I do all good
xsum=torch.sum(x.rewards, dim
print(xsum) <-- no issue
some_dict , list etc = xsum <--- error in RPC , or blocking>
left_hand = xsum.clone() <-- error
left_hand = copy_ same error etc.
# if I try to do anything else, it either
# block or crash, or I see an error in RPC
await some_foo(x, y)
no errors. at all
It looks like somewhere in downstream code, it either has a lock, but on read-only, it works; on left-hand assignment, it fails for tensors. But if that is the case, the last call some_foo (x, y) should produce the same behavior. i.e., if I pass x and y to downstream code, I have an issue now.
Does RPC logic do anything specific deep in a library with the tensors it receives?
Thank you,