Hi,
Context
I have a simple algorithm that distributes a number of tasks across a list of Process, then the results of the workers is sent back using a Queue. I was previously using numpy to do this kind of job.
Problem
To be more consistent with my code, I decided to use only torch tensors, unfortunately I think transfering torch.Tensor
over Queue is not possible, maybe because of Pickle or something. I get this kind of error when calling the get()
method to retrieve the result from my Queue.
worker_result = done_queue.get()
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/queues.py", line 113, in get
return ForkingPickler.loads(res)
File "/home/ganaye/deps/miniconda3/lib/python3.5/site-packages/torch/multiprocessing/reductions.py", line 70, in rebuild_storage_fd
fd = df.detach()
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/resource_sharer.py", line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/resource_sharer.py", line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/connection.py", line 493, in Client
answer_challenge(c, authkey)
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/connection.py", line 732, in answer_challenge
message = connection.recv_bytes(256) # reject large message
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/home/ganaye/deps/miniconda3/lib/python3.5/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer
Conclusion
After rapidly switching back to numpy and sending random numpy arrays, I am convinced the problem is from using torch tensor. One can reproduce the error with this tested code:
Code
import multiprocessing as mp
import torch
def extractor_worker(done_queue):
done_queue.put(torch.Tensor(10,10))
producers = []
done_queue = mp.Queue()
for i in range(0, 1):
process = mp.Process(target=extractor_worker,
args=(done_queue,))
process.start()
producers.append(process)
result_arrays = []
nb_ended_workers = 0
while nb_ended_workers != 1:
worker_result = done_queue.get()
if worker_result is None:
nb_ended_workers += 1
else:
result_arrays.append(worker_result)
Surprisingly it seems that writing is ok, but reading the object throws an error.
If any of you has a workaround to use torch tensor over Queue ! I can switch back to numpy if this is really impossible.
Thanks !