I have a bizarre issue and can’t narrow down a problem.
Agent <— > Observer.
(Observer takes the tensors and sends via async RPC)
Observer: # agent will issue this call, and foobar_rpc put tensor from Cuda to CPU and send. foobar_rpc(): tensor.clone().cpu()
The agent takes the future and puts asyncio queue.
Now in asyncio dequeue, does
Agent async def the_rpc_call() fu = rpc_async().foobar() # args etc. removed to make it clear await THE_QUEUE.put(the_fu)
async def foo() # skip code for a loop the_future = THE_QUEUE.get() <- asyncio queue the_future.wait() # wait for torch future Val = future.value() # read value x, y = Val ( let assume and x and y two named tuple) i.e., x.rewards <-- is tensor x.observation. <-- is tensor x.someint <-- native python type # If I do all good xsum=torch.sum(x.rewards, dim print(xsum) <-- no issue some_dict , list etc = xsum <--- error in RPC , or blocking> left_hand = xsum.clone() <-- error left_hand = copy_ same error etc. # if I try to do anything else, it either # block or crash, or I see an error in RPC await some_foo(x, y) no errors. at all
It looks like somewhere in downstream code, it either has a lock, but on read-only, it works; on left-hand assignment, it fails for tensors. But if that is the case, the last call some_foo (x, y) should produce the same behavior. i.e., if I pass x and y to downstream code, I have an issue now.
Does RPC logic do anything specific deep in a library with the tensors it receives?