I’m trying to use torch.multiprocessing module to handle multiple cameras with 1 model instance. But I always get RuntimeError:
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
My mp.set_start_method('spawn', force=True)
is wrapped inside name==main
from glob import glob
import torch._inductor.config
import torch.multiprocessing as mp
from tqdm import trange
from camera_manager import CameraManager
OUT_PATH = "video/out_grid_dev.mp4"
MAX_SECONDS = 30
FPS = 19
MAX_FRAMES = FPS * MAX_SECONDS
def main():
camera_manager = CameraManager()
video_paths = list(glob("video/*.avi"))[:4]
for video_path in video_paths:
camera_manager.add_camera(video_path)
for _ in trange(MAX_FRAMES, desc="Frame capturing"):
frames = camera_manager.get_frames()
...
if __name__ == '__main__':
mp.set_start_method('spawn', force=True)
torch._inductor.config.compile_threads = 1
main()
CameraManager is creating new Cameras, which inherit from BaseCamera
class CameraEvent:
def __init__(self):
self.manager = mp.Manager()
self.events = self.manager.dict()
self.timeout = 5
def wait(self):
ident = mp.current_process().name
if ident not in self.events:
self.events[ident] = [mp.Event(), time.time()]
return self.events[ident][0].wait()
def set(self):
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].is_set():
event[0].set()
event[1] = now
else:
if now - event[1] > self.timeout:
remove = ident
if remove:
del self.events[remove]
def clear(self):
self.events[mp.current_process().name][0].clear()
class BaseCamera:
processes = {}
manager = mp.Manager()
frame = manager.dict()
last_access = manager.dict()
event = {}
def __init__(self, device, model, encoder, tracker_manager):
BaseCamera.event[device] = CameraEvent()
if device not in BaseCamera.processes:
BaseCamera.processes[device] = None
if BaseCamera.processes[device] is None:
BaseCamera.last_access[device] = time.time()
BaseCamera.processes[device] = mp.Process(
target=self._thread,
args=(device, model, encoder, tracker_manager)
)
BaseCamera.processes[device].start()
while self.get_frame(device) is None:
time.sleep(0)
@classmethod
def get_frame(cls, unique_name):
BaseCamera.last_access[unique_name] = time.time()
BaseCamera.event[unique_name].wait()
BaseCamera.event[unique_name].clear()
return BaseCamera.frame[unique_name]
@staticmethod
def frames(cap, unique_name, model, encoder, tracker_manager):
raise RuntimeError('Must be implemented by subclasses')
@classmethod
def thread(cls, device, model, encoder, tracker_manager):
unique_name = (device.split("-")[1], device)
cap = cv2.VideoCapture(device)
frames_iterator = cls.frames(cap=cap, unique_name=unique_name, model=model,
encoder=encoder, tracker_manager=tracker_manager)
try:
for frame in frames_iterator:
BaseCamera.frame[device] = frame
BaseCamera.event[device].set()
time.sleep(0)
if time.time() - BaseCamera.last_access[device] > 10:
frames_iterator.close()
except Exception as e:
BaseCamera.event[device].set()
frames_iterator.close()
@classmethod
def _thread(cls, device, model, encoder, tracker_manager):
cls.thread(device, model, encoder, tracker_manager)
BaseCamera.processes[device] = None
How can I fix it?