I have a big problem with a delay between multiprocessing.Queue.get and releasing gpu tensor in multiprocess.
There are occasional large delays between them.
This is sample code for my problem :
from threading import Thread
import torch
import torch.cuda as cuda
import torch.multiprocessing as mp
import os
import cv2
import time
def proc(video_path):
frame_list = []
cap = cv2.VideoCapture(video_path)
if cap.isOpened():
while True:
ret, frame = cap.read()
if not ret:
break
infe_frame = torch.from_numpy(frame).to(torch.device("cuda"),non_blocking=True)
infe_frame = infe_frame.permute(2, 0, 1)
frame_list.append(infe_frame)
return frame_list
def put_frame(frame_list, mp_queue):
for frame in frame_list:
mp_queue.put(frame)
time.sleep(0.001)
print('put frame done')
def new_process(mp_queue):
while 1:
if mp_queue.empty():
time.sleep(0.001)
continue
get_start = time.time()
data = mp_queue.get()
get_end = time.time()
del_start = time.time()
del data
del_end = time.time()
print(f'get_time:{get_end-get_start}/del_time:{del_end-del_start}')
if __name__ == '__main__':
video_path = '/my/video/path.mp4'
frame_list = proc(video_path)
mp.set_start_method('spawn', force=True)
mp_queue = mp.Queue()
new_p = mp.Process(target=new_process, args=(mp_queue,))
put_th = Thread(target=put_frame, args=(frame_list, mp_queue,))
new_p.start()
put_th.start()
new_p.join()
put_th.join()
output :
get_time:0.0002560615539550781/del_time:0.00012421607971191406
get_time:0.0007522106170654297/del_time:0.18433046340942383 <<=
get_time:0.2054443359375/del_time:0.1036233901977539 <<=
get_time:0.0014698505401611328/del_time:0.0005314350128173828
get_time:0.0015175342559814453/del_time:0.0009710788726806641
get_time:0.0019102096557617188/del_time:0.19025516510009766 <<=
get_time:0.3015882968902588/del_time:0.0010426044464111328 <<=
get_time:0.00171661376953125/del_time:0.0005104541778564453
get_time:0.0005970001220703125/del_time:0.0002644062042236328
However, this does not happen if you replace multiprocess with threading.Thread
This is the code when using threading :
from threading import Thread
import torch
import torch.cuda as cuda
from collections import deque
import os
import cv2
import time
def proc(video_path):
... same frame load code ...
return frame_list
def put_frame(frame_list, mp_queue):
for frame in frame_list:
mp_queue.append(frame)
time.sleep(0.001)
print('put frame done')
def new_thread(de_queue):
while 1:
if not de_queue:
time.sleep(0.001)
continue
get_start = time.time()
data = de_queue.popleft()
get_end = time.time()
del_start = time.time()
del data
del_end = time.time()
print(f'get_time:{get_end-get_start}/del_time:{del_end-del_start}')
if __name__ == '__main__':
video_path = '/my/video/path.mp4'
frame_list = proc(video_path)
print(f'frame load done : {len(frame_list)}')
de_queue = deque()
new_th = Thread(target=new_thread, args=(de_queue,))
put_th = Thread(target=put_frame, args=(frame_list, de_queue,))
new_th.start()
put_th.start()
new_th.join()
put_th.join()
output :
get_time:2.86102294921875e-06/del_time:7.152557373046875e-07
get_time:2.86102294921875e-06/del_time:7.152557373046875e-07
get_time:2.86102294921875e-06/del_time:4.76837158203125e-07
get_time:2.86102294921875e-06/del_time:7.152557373046875e-07
get_time:2.6226043701171875e-06/del_time:7.152557373046875e-07
get_time:7.152557373046875e-07/del_time:4.76837158203125e-07
get_time:2.6226043701171875e-06/del_time:9.5367431640625e-07
get_time:2.6226043701171875e-06/del_time:4.76837158203125e-07
get_time:2.86102294921875e-06/del_time:7.152557373046875e-07
Why is this happening and how can I fix it?
thank you.