hi,
I’m trying to use torch.multiprocessing.Queue to transfer torch.tensor between processes (one consumer and many producers), and found the consumer is very very slow. I just wonder how to fix it or I did something wrong.
My simplified code is below, it is about one consumer and two producers. Every producer put one torch.tensor (with size 72012803) to a queue every 5ms, the consumer just does queue.get to get the tensor back without any additional effort. But the consumer is too slow to get all the tensors in the queue.
import os
import time
import argparse
from torch.multiprocessing import Process, Queue, Lock
import torch
parser = argparse.ArgumentParser(description='data loader')
parser.add_argument('-j', '--workers', default=2, type=int, metavar='N',
help='number of workers (default: 2)')
def get_ten(tens, lock):
#lock.acquire()
if not tens.empty():
ten = tens.get()
else:
ten = None
#lock.release()
return ten
def put_ten(tens, ten, lock):
#lock.acquire()
tens.put(ten)
#lock.release()
def worker(tens, process_index, tenlock):
#affinity_mask = {process_index+1}
#os.sched_setaffinity(0, affinity_mask)
#torch.set_num_threads(1)
while True:
time.sleep(0.005)
ten = torch.ones([720, 1280, 3])
put_ten(tens, ten, tenlock)
def main():
args = parser.parse_args()
#affinity_mask = {0}
#os.sched_setaffinity(0, affinity_mask)
#torch.set_num_threads(1)
tens = Queue()
process_count = args.workers
process_list = []
tenlock = Lock()
for i in range(process_count):
p = Process(target=worker, args=(tens, i, tenlock))
p.start()
process_list.append(p)
ten_count = 0
while True:
ten = get_ten(tens, tenlock)
if ten is not None:
ten_count = ten_count + 1
if ten_count % 500 == 0:
print("in consumer process, get %d tensors, still %d left in the queue" % (ten_count, tens.qsize()))
else:
print("in consumer process, do not get a tensor")
time.sleep(0.01)
for p in process_list:
p.join()
if __name__ == '__main__':
main()
The output looks like:
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, do not get a tensor
in consumer process, get 500 tensors, still 191 left in the queue
in consumer process, get 1000 tensors, still 272 left in the queue
in consumer process, get 1500 tensors, still 284 left in the queue
in consumer process, get 2000 tensors, still 323 left in the queue
in consumer process, get 2500 tensors, still 338 left in the queue
in consumer process, get 3000 tensors, still 357 left in the queue
in consumer process, get 3500 tensors, still 385 left in the queue
in consumer process, get 4000 tensors, still 401 left in the queue
in consumer process, get 4500 tensors, still 395 left in the queue
in consumer process, get 5000 tensors, still 408 left in the queue
in consumer process, get 5500 tensors, still 415 left in the queue
in consumer process, get 6000 tensors, still 435 left in the queue
in consumer process, get 6500 tensors, still 457 left in the queue
in consumer process, get 7000 tensors, still 478 left in the queue
in consumer process, get 7500 tensors, still 513 left in the queue
in consumer process, get 8000 tensors, still 519 left in the queue
in consumer process, get 8500 tensors, still 516 left in the queue
in consumer process, get 9000 tensors, still 547 left in the queue
in consumer process, get 9500 tensors, still 555 left in the queue
in consumer process, get 10000 tensors, still 560 left in the queue
in consumer process, get 10500 tensors, still 581 left in the queue
in consumer process, get 11000 tensors, still 604 left in the queue
in consumer process, get 11500 tensors, still 633 left in the queue
...
And finally, the system is out of memory.