Using cuda stream to perform parallel inference

Hi,
I am working on a code that allows inference to be performed on a single gpu in parallel, using threds and Cuda streams. After a lot of testing, I have not been able to achieve parallel execution, within the gpu. I have used Nvudia Nsight system as a tool to check correct operation.

I am using the following versions:

  • Python: 3.10.14
  • cuda_11.3.r11.3
  • pytorch: 2.3.0
  • gpu: NVIDIA A100 GPU
import torch
import torchvision.transforms as transforms
from PIL import Image
import time
import threading
from datasets import load_dataset
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import subprocess
import logging
import nvtx 

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Function to get GPU memory information
def get_gpu_memory_info():
    result = subprocess.run(['nvidia-smi', '--query-gpu=memory.total,memory.used,memory.free', '--format=csv,noheader,nounits'],
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"Failed to run nvidia-smi: {result.stderr}")
    total, used, free = map(int, result.stdout.strip().split(', '))
    return total, used, free

# Function to preprocess an image
def preprocess_image(pil_image, device):
    preprocess = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    img_tensor = preprocess(pil_image)
    img_tensor = img_tensor.unsqueeze(0).to(device)
    return img_tensor

# Function to perform prediction
def predict(model, img_tensor, label, thread_id, stream):
    with torch.no_grad(), nvtx.annotate(f"Thread-{thread_id}: Inference on Stream {stream}"):
        output = model(img_tensor)
    predicted_class = output.argmax(dim=1).item()
    logging.info(f"Thread {thread_id} prediction: {predicted_class}")

# Function to load the model
def load_model(name, device):
    model = torch.hub.load("pytorch/vision", name, pretrained=True)
    model.to(device)
    model.eval()
    return model

# Function for the prediction thread
def predict_thread(img_tensor, label, barrier, stream, model, thread_id):
    thread_name = threading.current_thread().name
    logging.info(f"Thread {thread_id} ({thread_name}) start time: {time.perf_counter():.4f}")
    with torch.cuda.stream(stream):
        barrier.wait()
        predict(model, img_tensor, label, thread_id, stream)
        barrier.wait()
    logging.info(f"Thread {thread_id} ({thread_name}) end time: {time.perf_counter():.4f}")

# Function to run the test with multiple threads
def run_test(num_threads, img_array, label, model):
    barrier = threading.Barrier(num_threads + 1)
    streams = [torch.cuda.Stream() for _ in range(num_threads + 1)]
    threads = []

    # Create threads for predictions on each stream
    for i in range(num_threads):
        stream = streams[i + 1]
        thread = threading.Thread(target=predict_thread, args=(img_array, label, barrier, stream, model, i + 1), name=f"Thread-{i + 1}")
        threads.append(thread)
    
    # Start the threads
    for thread in threads:
        thread.start()

    # Wait for all threads to be ready before starting the time measurement
    barrier.wait()
    start_time = time.perf_counter()
    barrier.wait()
    end_time = time.perf_counter()

    # Wait for all threads to finish
    for thread in threads:
        thread.join()

    total_time = end_time - start_time
    return total_time

# Function to repeat the tests multiple times
def repeat_tests(num_threads, img_array, label, model, num_repetitions):
    times = []
    for l in range(num_repetitions):
        logging.info(f"Starting repetition {l}")
        time_taken = run_test(num_threads, img_array, label, model)
        times.append(time_taken)
    mean_time = np.mean(times)
    median_time = np.median(times)
    max_time = np.max(times)
    min_time = np.min(times)
    return {
        'threads': num_threads,
        'mean_time': mean_time,
        'median_time': median_time,
        'max_time': max_time,
        'min_time': min_time
    }

if __name__ == "__main__":
    if not torch.cuda.is_available():
        raise RuntimeError("GPU is not available. Please check your CUDA installation.")

    device = torch.device('cuda')
    logging.info(f"Using device: {device}")

    # Load example data
    dataset_train = load_dataset('imagenet-1k', split='train', streaming=False, trust_remote_code=True)
    img_data = next(iter(dataset_train))
    img_array = preprocess_image(img_data['image'], device)
    label = img_data['label']

    num_threads_list = [1, 2, 4, 8, 16]
    num_repetitions = 100

    # GPU memory information before loading the model
    total_memory_before, used_memory_before, free_memory_before = get_gpu_memory_info()
    logging.info(f"Total GPU Memory (before loading model): {total_memory_before} MB")
    logging.info(f"Used GPU Memory (before loading model): {used_memory_before} MB")
    logging.info(f"Free GPU Memory (before loading model): {free_memory_before} MB")

    # Load model
    model = load_model("mobilenet_v2", device)

    results = []
    for num_threads in num_threads_list:
        stats = repeat_tests(num_threads, img_array, label, model, num_repetitions)
        results.append(stats)

If someone could tell me why the cuda.stream is not working as it should, since I don’t know if it is an error in my code or how they are implemented in pytorch, thank you.