I’m pretty bad at Python. I trained a Pytorch model working nicely for my needs and I need to serve it to call it from another application. The speed is important but the throuput even more. Right now it runs on the GPU and I can see one CPU thread 100% used, 11 totally idle. When running it on CPU I can see and average CPU usage a bit under 50%. What I would like to do is have a pool of 3 models, 2 on CPU and one on GPU, with the server taking any of the available model, favoring the GPU one if available.
Here is what I built so far (removed non meaningful lines):
DEVICE = 'cuda' if torch.cuda.is_available() else "cpu"
MODEL_ARCH = Models.YOLO_NAS_L
print("Device: ", DEVICE)
CHECKPOINT_DIR = f'{HOME}/checkpoints'
EXPERIMENT_NAME = "symbol_detection"
MODEL_ARCH = Models.YOLO_NAS_L
RUN_NAME = get_most_recent_directory(f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}")
best_model = models.get(
MODEL_ARCH,
num_classes=1,
checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
).to(DEVICE)
CONFIDENCE_TRESHOLD = 0.70
app = FastAPI()
@app.post("/predict", response_class=JSONResponse)
async def predict(request: Request, file: UploadFile = File(...)):
# Read the image from the request
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert('RGB')
# Make predictions
result = best_model.predict(image, conf=CONFIDENCE_TRESHOLD)
# Extract relevant information from the prediction
# This part will depend on the specific structure of DetectionPrediction
# Here's a basic example assuming it has 'boxes' and 'labels' attributes:
boxes = result.prediction.bboxes_xyxy.tolist()
labels = result.prediction.labels.tolist()
scores = result.prediction.confidence.tolist()
# Create a JSON-serializable response
response_data = {
"boxes": boxes,
"labels": labels,
"scores": scores
}
return JSONResponse(response_data)
How can I create 3 “best_models” (1 GPU, 2 CPU) and wait for one to be available to call the predict on it? I tried to use a ProcessPoolExecutor but I’m not able to get models with different parameters and favor the GPU one. Any idea how such a thing is doable?
Google Gemini is proposing me the code below. I like it, but I feel there might be a race condition where is_available is called by 2 threads at the same time, return true to both and things will start getting funky. In the Java world I will just use the synchronize feature but how can I do something similar in Python? Or does PyTorch already have a pooling API?
Thanks.
import torch
from fastapi import FastAPI, UploadFile, File, Request
from fastapi.responses import JSONResponse
from PIL import Image
import io
from concurrent.futures import ThreadPoolExecutor
# ... your existing code ...
DEVICE = 'cuda' if torch.cuda.is_available() else "cpu"
MODEL_ARCH = Models.YOLO_NAS_L
print("Device: ", DEVICE)
CHECKPOINT_DIR = f'{HOME}/checkpoints'
EXPERIMENT_NAME = "symbol_detection"
MODEL_ARCH = Models.YOLO_NAS_L
RUN_NAME = get_most_recent_directory(f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}")
# Create a pool of models
models_pool = []
# Add the GPU model
if torch.cuda.is_available():
gpu_model = models.get(
MODEL_ARCH,
num_classes=1,
checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
).to('cuda')
models_pool.append(gpu_model)
# Add the CPU models
for _ in range(2):
cpu_model = models.get(
MODEL_ARCH,
num_classes=1,
checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
).to('cpu')
models_pool.append(cpu_model)
# Create a ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=len(models_pool))
CONFIDENCE_TRESHOLD = 0.70
app = FastAPI()
@app.post("/predict", response_class=JSONResponse)
async def predict(request: Request, file: UploadFile = File(...)):
# Read the image from the request
contents = await file.read()
image = Image.open(io.BytesIO(contents)).convert('RGB')
# Submit the prediction task to the executor, prioritizing the GPU model
future = executor.submit(predict_with_available_model, image, CONFIDENCE_TRESHOLD)
result = await asyncio.wrap_future(future) # Make it awaitable
# ... (rest of your prediction processing code) ...
def predict_with_available_model(image, CONFIDENCE_TRESHOLD):
# Try to acquire the GPU model first
for model in models_pool:
if model.device.type == 'cuda' and model.is_available():
try:
model.is_available() = False # Mark as in use
result = model.predict(image, conf=CONFIDENCE_TRESHOLD)
model.is_available() = True # Mark as available again
return result
except Exception as e:
print(f"Error using GPU model: {e}")
# If the GPU model is not available, use a CPU model
for model in models_pool:
if model.device.type == 'cpu' and model.is_available():
try:
model.is_available() = False
result = model.predict(image, conf=CONFIDENCE_TRESHOLD)
model.is_available() = True
return result
except Exception as e:
print(f"Error using CPU model: {e}")
return None # Or handle the case where no model is available
I’ve fully explored this approach, and it doesn’t work. Uvicorn spawns a separate Python environment for each worker, resulting in each worker creating its own pool of 3 models. Unfortunately, they all attempt to use the same GPU, leading to conflicts. I read a bit about aiocache but serializing and de-serializing the model back and forth will probably kill the benefits.