Both CPU+GPU inference

Hi,

I’m pretty bad at Python. I trained a Pytorch model working nicely for my needs and I need to serve it to call it from another application. The speed is important but the throuput even more. Right now it runs on the GPU and I can see one CPU thread 100% used, 11 totally idle. When running it on CPU I can see and average CPU usage a bit under 50%. What I would like to do is have a pool of 3 models, 2 on CPU and one on GPU, with the server taking any of the available model, favoring the GPU one if available.

Here is what I built so far (removed non meaningful lines):

DEVICE = 'cuda' if torch.cuda.is_available() else "cpu"
MODEL_ARCH = Models.YOLO_NAS_L
print("Device: ", DEVICE)
CHECKPOINT_DIR = f'{HOME}/checkpoints'
EXPERIMENT_NAME = "symbol_detection"
MODEL_ARCH = Models.YOLO_NAS_L

RUN_NAME = get_most_recent_directory(f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}")

best_model = models.get(
    MODEL_ARCH,
    num_classes=1,
    checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
).to(DEVICE)

CONFIDENCE_TRESHOLD = 0.70
app = FastAPI()

@app.post("/predict", response_class=JSONResponse)
async def predict(request: Request, file: UploadFile = File(...)):

    # Read the image from the request
    contents = await file.read()
    image = Image.open(io.BytesIO(contents)).convert('RGB')

    # Make predictions
    result = best_model.predict(image, conf=CONFIDENCE_TRESHOLD)

    # Extract relevant information from the prediction
    # This part will depend on the specific structure of DetectionPrediction
    # Here's a basic example assuming it has 'boxes' and 'labels' attributes:
    boxes = result.prediction.bboxes_xyxy.tolist()
    labels = result.prediction.labels.tolist()
    scores = result.prediction.confidence.tolist()

    # Create a JSON-serializable response
    response_data = {
        "boxes": boxes,
        "labels": labels,
        "scores": scores
    }

    return JSONResponse(response_data)

How can I create 3 “best_models” (1 GPU, 2 CPU) and wait for one to be available to call the predict on it? I tried to use a ProcessPoolExecutor but I’m not able to get models with different parameters and favor the GPU one. Any idea how such a thing is doable?

Thanks.

Google Gemini is proposing me the code below. I like it, but I feel there might be a race condition where is_available is called by 2 threads at the same time, return true to both and things will start getting funky. In the Java world I will just use the synchronize feature but how can I do something similar in Python? Or does PyTorch already have a pooling API?

Thanks.

import torch
from fastapi import FastAPI, UploadFile, File, Request
from fastapi.responses import JSONResponse
from PIL import Image
import io
from concurrent.futures import ThreadPoolExecutor

# ... your existing code ...

DEVICE = 'cuda' if torch.cuda.is_available() else "cpu"
MODEL_ARCH = Models.YOLO_NAS_L
print("Device: ", DEVICE)
CHECKPOINT_DIR = f'{HOME}/checkpoints'
EXPERIMENT_NAME = "symbol_detection"
MODEL_ARCH = Models.YOLO_NAS_L

RUN_NAME = get_most_recent_directory(f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}")

# Create a pool of models
models_pool = []

# Add the GPU model
if torch.cuda.is_available():
    gpu_model = models.get(
        MODEL_ARCH,
        num_classes=1,
        checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
    ).to('cuda')
    models_pool.append(gpu_model)

# Add the CPU models
for _ in range(2):
    cpu_model = models.get(
        MODEL_ARCH,
        num_classes=1,
        checkpoint_path=f"{CHECKPOINT_DIR}/{EXPERIMENT_NAME}/{RUN_NAME}/model.pth"
    ).to('cpu')
    models_pool.append(cpu_model)

# Create a ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=len(models_pool))

CONFIDENCE_TRESHOLD = 0.70
app = FastAPI()

@app.post("/predict", response_class=JSONResponse)
async def predict(request: Request, file: UploadFile = File(...)):
    # Read the image from the request
    contents = await file.read()
    image = Image.open(io.BytesIO(contents)).convert('RGB')

    # Submit the prediction task to the executor, prioritizing the GPU model
    future = executor.submit(predict_with_available_model, image, CONFIDENCE_TRESHOLD)
    result = await asyncio.wrap_future(future)  # Make it awaitable

    # ... (rest of your prediction processing code) ...

def predict_with_available_model(image, CONFIDENCE_TRESHOLD):
    # Try to acquire the GPU model first
    for model in models_pool:
        if model.device.type == 'cuda' and model.is_available():
            try:
                model.is_available() = False  # Mark as in use
                result = model.predict(image, conf=CONFIDENCE_TRESHOLD)
                model.is_available() = True  # Mark as available again
                return result
            except Exception as e:
                print(f"Error using GPU model: {e}")

    # If the GPU model is not available, use a CPU model
    for model in models_pool:
        if model.device.type == 'cpu' and model.is_available():
            try:
                model.is_available() = False
                result = model.predict(image, conf=CONFIDENCE_TRESHOLD)
                model.is_available() = True
                return result
            except Exception as e:
                print(f"Error using CPU model: {e}")

    return None  # Or handle the case where no model is available

I’ve fully explored this approach, and it doesn’t work. Uvicorn spawns a separate Python environment for each worker, resulting in each worker creating its own pool of 3 models. Unfortunately, they all attempt to use the same GPU, leading to conflicts. I read a bit about aiocache but serializing and de-serializing the model back and forth will probably kill the benefits.

Still looking for guidance and idea.