Torch.multiprocessing: how to "reuse buffers passed through a Queue"?

In Multiprocessing best practices, what is meant by the following?

Reuse buffers passed through a Queue

Remember that each time you put a Tensor into a multiprocessing.Queue, it has to be moved into shared memory. If it’s already shared, it is a no-op, otherwise it will incur an additional memory copy that can slow down the whole process. Even if you have a pool of processes sending data to a single one, make it send the buffers back - this is nearly free and will let you avoid a copy when sending next batch.

(from: http://pytorch.org/docs/master/notes/multiprocessing.html#reuse-buffers-passed-through-a-queue)

I’ve just written a simple dummy example below, but I’m not sure how buffers can be reused. I’m assuming the memory of the tensor t is being shared and not copied when transferred between p1 and p2 (because I’m using torch.multiprocessing). However, I’m not sure if it is possible for buffers to be reused since y goes out of scope.

How can I implement this best practice? Is the documentation suggesting p1 and p2 have access to another mp.Queue, which would be used to communicate references to the variables that are currently going out of scope and not being reused?

import torch
import torch.multiprocessing as mp


def fill(q):
  while True:
    t = torch.rand(10000, 10000)
    q.put(t)

def process(q1, q2):
  while True:
    x = q1.get()
    y = torch.pow(x, 10)
    z = torch.sum(y)
    q2.put(z)

if __name__ == '__main__':
  q1 = mp.Queue(maxsize=10)
  q2 = mp.Queue(maxsize=10)
  p1 = mp.Process(target=fill, args=(q1,))
  p2 = mp.Process(target=process, args=(q1, q2))
  p1.start()
  p2.start()
  
  # Never die
  while True:
    continue
8 Likes

If anyone has any ideas, I am still wondering :slight_smile:

Wondering as well. I’m passing 25 * 224 * 224 * 3 float arrays, and also batches of those through queues, and I’m weary of this line, since I’m sure the transfer could be way more efficient.

1 Like

What kind of queues are you using, multiprocessing.Queues?

Hi there, I’m still curious if anyone has any ideas. Thanks! :smiley:

I’m also curious. I didn’t get the “send back”.

1 Like

Ok. This is not really an answer to the question, but the complaint is the same, and I found another solution. Passing objects to a mp.Queue doesn’t only involve copy, but also serialization, which is what is the most expensive.

So I switched to an almost entirely multithreading pipeline (and whenever I really need to have differents trainings/inferences at the same time, I use multiprocessing but not with queues).

If you don’t care about having different training tasks on the same model at the same time, I would advise using a multithreading pipeline. Replace all queues by simple lists, and just make sure you list isn’t empty before popping it. On my problem, I have a *60 gain of speed on data sharing.

If you do need a process (training the same model on multiple batches in parallel), you can use a mp.Array. It is more effort but still has a *20 gain of transfer speed on my task. You should make the effort to use the locking methods, and reshaping your data to do this.

Hope this helps.

1 Like

But I’m still interested !

I am also curious! .

Same here… Would appreciate if somebody can share insights on this.

Also, pointers to properly optimized code that uses torch.multiprocessing and multiprocessing.Queue would be good. Of course one can find some examples by googling, but it is not clear whether these would be following “best practices”.

Thanks!

I have a guess.
It should need two queues. One for receiving Qr and one for sending Qs.

Assuming Qr and Qs are inverted for worker run and the main, we can write:

def run(Qr, Qs):
    while True:
        b = Qr.get()
        torch.randn(..., out=b) # reusing the buffer tensor without re-allocating
        Qs.put(b)

def main():
    ...
    for i in range(buffer_size): # allocate buffers
        b = torch.empty(...)
        Qs.put(b)
    ... start processes ...
    while True:
        b = Qr.get() # main's Qr = run's Qs
        ... use b ...
        Qs.put(b) # main's Qs = run's Qr

In this fashion, there should be no more than buffer_size tensors in the system. They are always reused and circulated.

Caveat: I see the cost of an additional queue to be huge. It might not be worth the benefit of not reallocating additional shared memory.

2 Likes

Resuming this thread.
Anyone got an idea of what is meant by reusing the buffers? I didn’t really understand the idea behind the example given above

Also curious by what is meant by reusing buffers passed through a queue.

And more generally, the documentation on torch.multiprocessing is sparse and what differences there are (if any) from Python’s multiprocessing module.

The documentation is (very likely) wrong. You can’t actually avoid a copy when using multiprocessing.Queue (which torch.multiprocessing.queue inherits from). I totally get why this statement is there though because in C/C++ it is possible to avoid the copy. In this case one would NOT use a queue though.

Beginning with Python 3.8 there is the option to use multiprocessing.shared_memory and achieve something similar to what is possible in C/C++. This is a fairly new feature though, and - frankly - it is a nightmare to figure out what multiprocessing exactly does when under the hood because it is heavily OS-dependent. Hence, having that statement there is perfectly reasonable, albeit erroneous. I’d suspect that some smart person will soon submit a PR that allows using multiprocessing.shared_memory instead of multiprocessing.queue when sharing tensors if this isn’t already happening as we speak.

The way “shared process memory” works under the hood is by sharing a memory-mapped file which multiple processes can read and write to. On POSIX-based systems (Linux) there is also something called a unix domain socket. This is very similar to the familiar web-socket (same API), except that it writes to - you guessed it - a file, which typically resides in memory.

On Linux, roughly speaking, multiprocessing.Queue instantiates such a domain socket (it’s a pretty reasonable analogy) and then uses it to pass data around in the same fashion you’d pass data between two servers. This is why objects get serialized and why copies are unavoidable in this approach. When being sent, objects get placed into the queue (written to the file), and once received, they get evicted from the queue (read from the file). This is what a queue DOES; it also is the reason you don’t want to use this paradigm if you want to avoid copies.

On Windows, I am actually not sure what happens (I have not seen good documentation nor enlightening examples yet). If I had to guess, then I would say python goes down the mmap route, and handles the queue logic by itself to match the behavior to the behavior on Linux. It’s still a queue though, so it will still copy data in and out of the (memory-mapped) file.

On a tangent: Memory mapping implies host-managed, paged memory (though you might be able to turn that off) which means you’ll need an additional copy to pinned memory before sending it to the GPU for inference.

1 Like