Hi there!
I am deploying a Pytorch model to a Dataflow streaming pipeline for ML Inference. The workers on the pipeline are mounted with a GPU to accelerate the inference. Essentially, dataflow in this case, creates one python process per core on the worker (in my case 16 cores so 16 python processes) and loads the model to the GPU.
In some events, this loading process gets stuck and does not really recover (unclear why this happens but my best guess is some sort of deadlock).
In this case, I’m trying to implement a timeout to pre-empt this so it can try again or at least prevent the pipeline from getting stuck. Is there a sane way of doing this?
This is the solution I’ve tried:
pool = ProcessPoolExecutor(max_workers=1)
logging.info(f"Loading model with timeout as {timeout} seconds.")
start = time.time()
future = pool.submit(load_func)
try:
model = future.result(timeout=timeout)
end = time.time()
except (TimeoutError, CancelledError) as e:
# Don't wait for the process to finish, otherwise it will hang if the process is stuck
logging.debug("Shutting down executor.")
pool.shutdown(wait=False)
logging.debug("Killing child processes.")
_kill_child_processes(os.getpid())
raise LoadModelTimeoutError(f"Model loading timed out after {timeout} seconds.") from e
logging.info(f"Model loaded in {end - start:.2f} seconds.")
return model
This seems to be problematic though because, AFAIK, you cannot kill the producing process (in this case the child process that generates the future) while the consuming process is trying to access it.