Pytorch load and train data async

Hello, it is my first time in the forum
Recently I am doing a medical image research, which requires loading a very big pathology slide(about 5GBytes), and doing some CNN jobs.

Problem

Is it possible to run dataloading and model.run() in an async fashion (producer and consumer)?

  • model.run() is executing in GPU
  • dataloading() is executing in CPU
    I have understand the ideas and usage of python asyncio library
    And the toy example works fine below

Toy example code and profiling result

  • Code
    (reason I set wait_consumer to 3.2 and wait_producer to 1.7 is because that in the real scenario, the consumer (model.run() takes 320 sec to finish while 170 for imgloader )
wait_consumer = 3.2
wait_pdocuer = 1.7
async def consumer(q):
    while True:
        x, xx = await q.get()
        q.task_done()


async def producer(q, n):
    for x in range(n):
        await asyncio.sleep(wait_producer) # IO takes time for 10 sec
        await q.put((x, x * x))

async def async_main(n):
    q = asyncio.Queue()
    consumers = asyncio.ensure_future(consumer(q))
    await producer(q, n)
    await q.join()
    consumers.cancel() 

def main_run_async(n):
    t_start = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main(n))
    loop.close()

    t_end = time.time()
    return t_end - t_start
  • Profiling the results
    The async property works well, the orange and blue bar symbolizes the async, producer and consumer version while the green bar is the naïve serial one.

Project code

  • Now change the asyncio.sleep() to real code in my research project (some of the name has been hidden due to confidentiality, but most of the architecture works fine and very similar to that of above)
async def model_run_async(model):
    model.run() # Model run in GPU


async def fun_consumer(q, args):
    while True:
        iteration, img_loader = await q.get() # Popped the produced img_loader from queue

        model = ProjectModel(
            image_loader=img_loader,
            image_size=args.img_size,
        )

        print('[ASYNC LOG CONS] Start time {}'.format(time.time() - t_zero))
        t_consumer_start.append(time.time() - t_zero)
        await model_run_async(model)
        t_consumer_end.append(time.time() - t_zero)
        print('[ASYNC LOG CONS] End time {}'.format(time.time() - t_zero))
        q.task_done()

async def image_load_async(args):
    img_loader = BaselineImageLoader(
            slide_path=args.slide_path,
            resize_ratio=args.resize_ratio,
            img_size=args.img_size,
    )
    return img_loader

async def fun_producer(q, args):
    for iteration in range(args.iters):

        print('[ASYNC LOG PROD] Start time {}'.format(time.time() - t_zero))
        t_producer_start.append(time.time() - t_zero)
        img_loader = await image_load_async(args) # block here and hope I can run consumer in this stage (train the last-batch data during loading the current batch of data)
# Load big medical slide data in CPU
        print('[ASYNC LOG PROD] End time {}'.format(time.time() - t_zero))
        q.put((iteration, img_loader)) # Put into queue
        t_producer_end.append(time.time() - t_zero)

async def async_main(args):
    q = asyncio.Queue()
    consumers = asyncio.ensure_future(fun_consumer(q, args))
    await fun_producer(q, args)
    await q.join()
    consumers.cancel()

def main_run_async(args):
    t_start = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main(args))
    loop.close()

    t_end = time.time()
    return t_end - t_start
  • Profiling the results
    Only the producer seems to be executing while the consumer does not work similar to that of the toy example (Sorry that I cannot post more than 1 image)
    [ASYNC LOG PROD] Start time 0, End time 166
    [ASYNC LOG PROD] Start time 166, End time 332
    [ASYNC LOG PROD] Start time 332, End time 499
    … etc, but WITHOUT THE LOG FROM CONSUMER (model run)

Thank you very much in advance for the kindness help, it really helps my research.

My leased GPU server HW and SW configuration

CPU: Xeon Gold 6154
GPU: NVIDIA Tesla V100

Ubuntu 18.04.5 LTS
NVIDIA-SMI 450.80.02 CUDA 11.1

Pytorch: 1.8.0a0+17f8c32
Torchvision: 0.8.0a0
Python: 3.6.10 (anaconda, unable to change since I am constrained in container)

Would it be possible to write a custom Dataset as described here and pass it to a DataLoader, which can use multiple workers and could thus hide the latency of the data loading and processing?