Low DataLoader worker CPU utilization with pytorch lightning

Hi!
I’m training a small transformer using pytorch lightning on 2 GPUs via slurm. The GPU utilization is quite bad and depending on the num_workers I have set, each worker “works” with maximum 1/num_workers %. So when I set it to 4, I have 4 workers at 25%. The getitem method of the underlying dataset takes ~2ms, all data comes from the RAM. I’ve tested wrapping the dataset in a DataLoader locally and there it runs as expected with each worker going up to 100%, effectively parallelizing the work. But as soon as I run it via lightning, with or without slurm, below picture unfolds.

image

Can anyone give some hints on what the problem is? I’m happy to answer clarifying questions or to share code if needed. Thanks!

It sounds like you’re mostly concerned about your GPU utilization being low. However, you’re debugging the CPU utilization of your dataloader. Have you confirmed that the reason for low GPU utilization is data starvation? (Otherwise, it may be the case that dataloader is running as fast as it can and thus adding more threads doesn’t help and simply drops the utilization-per-thread proportionally?

Actually no. How can I do that?

you could try using the pytorch profiler (PyTorch Profiler — PyTorch Tutorials 2.1.0+cu121 documentation) to look for one class of issues: if there is time where no kernels are running on the gpu, then you are probably CPU bound. It may be the case that data-starvation causes this, but lots of other CPU code can also be running (as part of pytorch operators, or as part of your own model code)- it’s important to find which is the case before trying to optimize.

It’s also possible that you are always ‘running’ GPU kernels but the kernels are not utilizing the GPU hardware fully, due perhaps to small batch size.

1 Like

So, this has aged a bit, but the problem persists. Here is an example output of the lightning profiler:

Action Mean duration (s) Num calls Total time (s) Percentage %
Total - 1904 124.29 100 %
run_training_epoch 90.883 1 90.883 73.124
[_TrainingEpochLoop].train_dataloader_next 1.6427 50 82.137 66.087
[_EvaluationLoop].val_next 6.6148 3 19.845 15.967
[Strategy]SingleDeviceStrategy.batch_to_device 0.12114 52 6.2995 5.0685
[LightningModule]TreeCrownDelineationModel.transfer_batch_to_device 0.12102 52 6.2932 5.0635
run_training_batch 0.035017 50 1.7509 1.4087
[LightningModule]TreeCrownDelineationModel.optimizer_step 0.034779 50 1.739 1.3992

And here’s the code I’m using, including the DataModule and Dataset definition:

Click me
import os
import numpy as np
from os.path import join, splitext, basename
import torch
from pytorch_lightning.utilities.types import TRAIN_DATALOADERS, EVAL_DATALOADERS
from torch.utils.data import Dataset
from treecrowndelineation import TreeCrownDelineationModel
from pytorch_lightning import Trainer, LightningDataModule
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger
import albumentations as A
from glob import glob
from PIL import Image
from torch.utils.data import DataLoader

torch.set_float32_matmul_precision("medium")

#%%
class ISPRSDataset(Dataset):
    def __init__(self, images, masks, outlines, dist_trafo, augmentation):
        super().__init__()
        # we assume images and labels match perfectly
        self.images     = images
        self.masks      = masks
        self.outlines   = outlines
        self.dist_trafo = dist_trafo
        self.augmentation = augmentation

    def __len__(self):
        return len(self.images)

    def __getitem__(self, item):
        image       = np.array(Image.open(self.images[item]))
        mask        = np.array(Image.open(self.masks[item]))
        outline     = np.array(Image.open(self.outlines[item]))
        dist_trafo  = np.array(Image.open(self.dist_trafo[item]))

        labels = np.stack((mask, outline, dist_trafo), axis=-1)

        # return image, labels
        augmented = self.augmentation(image=image, mask=labels)
        return (augmented["image"].transpose((2, 0, 1)).astype(np.float32),
                 augmented["mask"].transpose((2, 0, 1)).astype(np.float32))


class ISPRSDataModule(LightningDataModule):
    def __init__(self, images, masks, outlines, dist_trafo, augmentation, workers=6, batch_size=64):
        super().__init__()
        self.images     = np.array(sorted(glob(join(images, "*.png")), key=lambda x: splitext(basename(x))[0]))
        self.masks      = np.array(sorted(glob(join(masks, "*.png")), key=lambda x: "_".join(splitext(basename(x))[0].split("_")[:-1])))
        self.outlines   = np.array(sorted(glob(join(outlines, "*.png")), key=lambda x: "_".join(splitext(basename(x))[0].split("_")[:-1])))
        self.dist_trafo = np.array(sorted(glob(join(dist_trafo, "*.tif")), key=lambda x: splitext(basename(x))[0].split("_")[:-2]))
        self.augmentation = augmentation
        self.workers = workers
        self.batch_size = batch_size

        # sanity checks
        assert len(self.images) > 0
        assert len(self.images) == len(self.masks) == len(self.outlines) == len(self.dist_trafo)

        # compare basenames
        im_bn = np.array([splitext(basename(x))[0] for x in self.images])
        m_bn =  np.array(["_".join(splitext(basename(x))[0].split("_")[:-1]) for x in self.masks])
        o_bn =  np.array(["_".join(splitext(basename(x))[0].split("_")[:-1]) for x in self.outlines])
        d_bn =  np.array(["_".join(splitext(basename(x))[0].split("_")[:-2]) for x in self.dist_trafo])

        assert (im_bn==m_bn).all() and (im_bn == o_bn).all() and (im_bn == d_bn).all()

    def setup(self, stage: str) -> None:
        train_set, val_set = torch.utils.data.random_split(np.arange(len(self.images)), (0.8, 0.2))
        train_set = np.array(train_set)
        val_set = np.array(val_set)
        self.train_ds = ISPRSDataset(self.images[train_set],
                                     self.masks[train_set],
                                     self.outlines[train_set],
                                     self.dist_trafo[train_set],
                                     augmentation=self.augmentation)
        self.val_ds   = ISPRSDataset(self.images[val_set],
                                     self.masks[val_set],
                                     self.outlines[val_set],
                                     self.dist_trafo[val_set],
                                     augmentation=self.augmentation)

    def train_dataloader(self) -> TRAIN_DATALOADERS:
        return DataLoader(self.train_ds, batch_size=self.batch_size, pin_memory=False, num_workers=self.workers)

    def val_dataloader(self) -> EVAL_DATALOADERS:
        return DataLoader(self.val_ds, batch_size=self.batch_size, pin_memory=False, num_workers=self.workers)


#%%
# base = "/data_hdd/isprs-itc-seg/new/train/"
base = "/tmp/train/"
logdir = "./log"
model_save_path = "./models"
experiment_name = "isprs"

arch = "Unet-resnet18"
width = 256
batchsize = 16
in_channels = 3
devices = 1  # number of gpus, if you have multiple
accelerator = "auto"  # or gpu or cpu, see lightning docs
max_epochs = 30 + 60 - 1
lr = 3E-4

training_split = 0.8

model_name = "{}_epochs={}_lr={}_width={}_bs={}".format(arch,
                                                        max_epochs,
                                                        lr,
                                                        width,
                                                        batchsize)

#%%
###################################
#             training            #
###################################
logger = TensorBoardLogger(logdir,
                           name=experiment_name,
                           # version=model_name,
                           default_hp_metric=False)

cp = ModelCheckpoint(os.path.abspath(model_save_path) + "/" + experiment_name,
                     model_name + "-{epoch}",
                     monitor="val/loss",
                     save_last=True,
                     save_top_k=2)

callbacks = [cp, LearningRateMonitor()]

train_augmentation = A.Compose([A.RandomCrop(width, width, always_apply=True),
                                A.RandomRotate90(),
                                A.VerticalFlip()
                                ])
# val_augmentation = A.RandomCrop(width, width, always_apply=True)

data = ISPRSDataModule(images=base+"images",
                       masks=base+"masks",
                       outlines=base+"outlines",
                       dist_trafo=base+"dist_trafo",
                       augmentation=train_augmentation)

model = TreeCrownDelineationModel(in_channels=in_channels, lr=lr)
# model = torch.compile(model)

#%%
trainer = Trainer(devices=devices,
                  accelerator=accelerator,
                  logger=logger,
                  callbacks=callbacks,
                  # checkpoint_callback=False,  # set this to avoid logging into the working directory
                  max_epochs=max_epochs,
                  enable_progress_bar=False,
                  profiler="simple",
                  max_steps=50,
                  )
trainer.fit(model, data)

#%%
model.to("cpu")
t = torch.rand(1, in_channels, width, width, dtype=torch.float32)
model.to_torchscript(
    os.path.abspath(model_save_path) + "/" + experiment_name + '/' + model_name + "_jitted.pt",
    method="trace",
    example_inputs=t)

One dataset getindex call takes 38ms. So assembling a batch of 64 should take around 2.4s in one process and around 400ms with 6 processes. But actually it takes around 1.7s. All images are again in a ram disk.

The solution was to update pytorch lightning from 2.0.3 to 2.3.0.

Not worked for me. Sadddddd