Tensors as items in multiprocessing.queue

Hi, I was wondering if there is anything wrong with the example below. Basically I need several processes to enqueue tensors in a shared torch.multiprocessing.Queue. I figured to ask here first before posting an issue on github.

import torch
import torch.multiprocessing as mp

def put_in_q(idx, q):
    q.put(torch.IntTensor(2, 2).fill_(idx))
    # q.put(idx)  # works with int, float, str, np.ndarray, but not torch.Tensor

q = mp.Queue()

p = mp.Process(target=put_in_q, args=(0, q))
p.start()

x = q.get()
print(x)

p.join()

The error I get:

Traceback (most recent call last):
  File "test_torch_queue.py", line 15, in <module>
    x = q.get()
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "/home/florin/Tools/anaconda3/lib/python3.6/site-packages/torch/multiprocessing/reductions.py", line 72, in rebuild_storage_fd
    fd = df.detach()
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/resource_sharer.py", line 57, in detach
    with _resource_sharer.get_connection(self._id) as conn:
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/resource_sharer.py", line 87, in get_connection
    c = Client(address, authkey=process.current_process().authkey)
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/connection.py", line 493, in Client
    answer_challenge(c, authkey)
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/connection.py", line 732, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/home/florin/Tools/anaconda3/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer

Thanks!

The subprocess needs to be alive at the time when the master process receives the Tensor. There are two ways to fix that:

  1. Use an Event to synchronize processes
  2. Use file_system sharing strategy (not recommended)

This example should work:

import torch
import torch.multiprocessing as mp

def put_in_q(idx, q, evt):
    q.put(torch.IntTensor(2, 2).fill_(idx))
    evt.wait()

q = mp.Queue()

evt = mp.Event()
p = mp.Process(target=put_in_q, args=(0, q, evt))
p.start()

x = q.get()
evt.set()
print(x)

p.join()

To synchronize a larger number of processes you can use mp.Barrier (available only in Python3).

3 Likes

Thank you for the quick reply!