The consumer is very slow using torch.tensor with torch.multiprocessing.Queue

hi,

I’m trying to use torch.multiprocessing.Queue to transfer torch.tensor between processes (one consumer and many producers), and found the consumer is very very slow. I just wonder how to fix it or I did something wrong.

My simplified code is below, it is about one consumer and two producers. Every producer put one torch.tensor (with size 72012803) to a queue every 5ms, the consumer just does queue.get to get the tensor back without any additional effort. But the consumer is too slow to get all the tensors in the queue.

import os
import time
import argparse
from torch.multiprocessing import Process, Queue, Lock
import torch

parser = argparse.ArgumentParser(description='data loader')
parser.add_argument('-j', '--workers', default=2, type=int, metavar='N',
                    help='number of workers (default: 2)')

def get_ten(tens, lock):
    #lock.acquire()
    if not tens.empty():
        ten = tens.get()
    else:
        ten = None
    #lock.release()
    return ten

def put_ten(tens, ten, lock):
    #lock.acquire()
    tens.put(ten)
    #lock.release()

def worker(tens, process_index, tenlock):
    #affinity_mask = {process_index+1}
    #os.sched_setaffinity(0, affinity_mask)
    #torch.set_num_threads(1)
    while True:
        time.sleep(0.005)
        ten = torch.ones([720, 1280, 3])
        put_ten(tens, ten, tenlock)

def main():
    args = parser.parse_args()
    #affinity_mask = {0}
    #os.sched_setaffinity(0, affinity_mask)
    #torch.set_num_threads(1)

    tens = Queue()
    process_count = args.workers
    process_list = []
    tenlock = Lock()
    for i in range(process_count):
        p = Process(target=worker, args=(tens, i, tenlock))
        p.start()
        process_list.append(p)

    ten_count = 0
    while True:
        ten = get_ten(tens, tenlock)
        if ten is not None:
            ten_count = ten_count + 1
            if ten_count % 500 == 0:
                print("in consumer process, get %d tensors, still %d left in the queue" % (ten_count, tens.qsize()))
        else:
            print("in consumer process, do not get a tensor")
            time.sleep(0.01)

    for p in process_list:
        p.join()

if __name__ == '__main__':
    main()

The output looks like:

in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, get 500 tensors, still 191 left in the queue
in consumer process, get 1000 tensors, still 272 left in the queue
in consumer process, get 1500 tensors, still 284 left in the queue
in consumer process, get 2000 tensors, still 323 left in the queue
in consumer process, get 2500 tensors, still 338 left in the queue
in consumer process, get 3000 tensors, still 357 left in the queue
in consumer process, get 3500 tensors, still 385 left in the queue
in consumer process, get 4000 tensors, still 401 left in the queue
in consumer process, get 4500 tensors, still 395 left in the queue
in consumer process, get 5000 tensors, still 408 left in the queue
in consumer process, get 5500 tensors, still 415 left in the queue
in consumer process, get 6000 tensors, still 435 left in the queue
in consumer process, get 6500 tensors, still 457 left in the queue
in consumer process, get 7000 tensors, still 478 left in the queue
in consumer process, get 7500 tensors, still 513 left in the queue
in consumer process, get 8000 tensors, still 519 left in the queue
in consumer process, get 8500 tensors, still 516 left in the queue
in consumer process, get 9000 tensors, still 547 left in the queue
in consumer process, get 9500 tensors, still 555 left in the queue
in consumer process, get 10000 tensors, still 560 left in the queue
in consumer process, get 10500 tensors, still 581 left in the queue
in consumer process, get 11000 tensors, still 604 left in the queue
in consumer process, get 11500 tensors, still 633 left in the queue
...

And finally, the system is out of memory.

When I uncomment these lines:

    #affinity_mask = {process_index+1}
    #os.sched_setaffinity(0, affinity_mask)
    #torch.set_num_threads(1)

I can get the following outputs w/o OOM. I think not setting CPU affinity caused the slowdown you’ve seen.

in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, get 500 tensors, still 222 left in the queue
in consumer process, get 1000 tensors, still 223 left in the queue
in consumer process, get 1500 tensors, still 223 left in the queue
in consumer process, get 2000 tensors, still 219 left in the queue
in consumer process, get 2500 tensors, still 223 left in the queue
in consumer process, get 3000 tensors, still 223 left in the queue
in consumer process, get 3500 tensors, still 219 left in the queue
in consumer process, get 4000 tensors, still 224 left in the queue
in consumer process, get 4500 tensors, still 225 left in the queue
in consumer process, get 5000 tensors, still 231 left in the queue

thanks @wayi , actually the issue is still there when uncomment these three lines.

  1. I think you’ll still see OOM when running more time. I verified on my machine, we’ll always see OOM finally just if the left number in the queue is increased.

  2. We just put 1000/5 = 200 tensors per second in the queue in each producer, that’s 400 tensors per second in total. It is expected to see just several (0-5) tensors left in the queue due to the nature of async. It is still an issue if we see 2xx left in the queue even if there’s no OOM at last.

  3. I tried different size of the tensor, there’s no such issue if the tensor is very small. Just wonder why, 400 720p (1080*720) RGB24 (3bytes) images should not be a bottle neck even they are shared between processes via memory copy.

thanks

1 Like

Thanks for the explanation!

Please check the best practices here: Multiprocessing best practices — PyTorch 1.9.1 documentation

2 suggestions:

  1. You can replace Queue with SimpleQueue. This seems to work for me.
  2. You can consider reusing buffers passed through the queue to avoid additional memory copy.

thanks.

Looks that there’s a typo for SimpleQueue in the best practices.

from torch.multiprocessing.queues import SimpleQueue

we’ll get

ModuleNotFoundError: No module named 'torch.multiprocessing.queues'

Then I tried with

from torch.multiprocessing.queue import SimpleQueue
tens = SimpleQueue()

and the error message is:

    tens = SimpleQueue()
TypeError: __init__() missing 1 required keyword-only argument: 'ctx'

Just think memory copy of 400 720p (1080*720) RGB24 (3bytes) images is not big. In other words, even if we reduce the memory copy, it still does not resolve the issue since it is not the bottle neck.

I actually replaced Queue in your code with SimpleQueue w/o importing any new package, and this means you need to import SimpleQueue directly from torch.multiprocessing.

I see below error if just replace Queue in the code.

    tens = SimpleQueue()
NameError: name 'SimpleQueue' is not defined

And I see another error if change the code to torch.multiprocessing.SimpleQueue()

    print("in consumer process, get %d tensors, still %d left in the queue" % (ten_count, tens.qsize()))
AttributeError: 'SimpleQueue' object has no attribute 'qsize'

The best practices webpage says: multiprocessing.queues.SimpleQueue. It’s the reason that I think there might be a typo here.

tens = SimpleQueue()

NameError: name ‘SimpleQueue’ is not defined

You also need to import SimpleQueue.

from torch.multiprocessing import Process, **Queue**, Lock

from torch.multiprocessing import Process, **SimpleQueue**, Lock

print("in consumer process, get %d tensors, still %d left in the queue" % (ten_count, tens.qsize()))

AttributeError: ‘SimpleQueue’ object has no attribute ‘qsize’

I guess you can just avoid qsize in the print method. If you still want to print it out, then you can create a separate counter instead.

tens.qsize() shows how slow the consumer process is, and we could not mock it by another counter since it is the result of the multiple processes communication.

tens.qsize() shows how slow the consumer process is, and we could not mock it by another counter since it is the result of the multiple processes communication.

One workaround probably can be calling an allreduce to sum up the counters on each process, and hence know how many tensors are consumed in total. You can change the frequency of allreduce. Agree that this may not be elegant. Hopefully other people can give better suggestions here.

thanks,

but let’s come back to the original issue, why the consumer is so slow using torch.tensor with torch.multiprocessing.Queue? I would expect it can be resolved with a very simple fix to share tensors between processes with acceptable performance. Or we have to admit that the current implementation needs improvement … :frowning: and so other people (maybe me maybe not, :slight_smile: ) can start the work for the improvement when have bandwidth.

1 Like

I am actually not familiar with these libraries either. It can be a good idea to file a bug on Github.