Use case for loading the entire dataset into RAM

Hi,

Currently, I am using a HPC cluster. The case is that the program running time is much longer than using my own machine. I checked the profile and identified that the bottleneck seems to be the data loading time. After searching the hardware specification of HPC, I find that since the data is stored in another node, the data transfer would rely on the internal network speed which could be unstable or slow when the traffic of HPC get crowded.

My intended solution is to load the entire dataset into RAM instead of read them on the fly, here are some questions I want to clarify:

Suppose my dataset is 140GB and I have 180GB RAM

  1. How does num_workers in Dataloader affect the RAM usage? Would the RAM consumption be the same when num_workers > 1 since I have loaded the dataset into RAM and there should not be extra space consumption?
  2. Under the context of DDP, would the program keep two copies of dataset in RAM or it refers to the same memory page in RAM? In my case, I could fit one entire dataset into RAM not two.
  3. In this case, would setting num_workers > 1 still help to speedup data loading time?

Thank you for the time and helps!
Kind regards.
Dylan

Just a suggestion: don’t use the vanilla dataloader for this. If you can fit the data all into ram, just turn it into a tensor for each set of data, shape being:

self.dataset.size()=([data_index, ...])

Where each data_index is a sample.

Then create an indexer, either in NumPy or Pytorch. Here is an example for NumPy.

rng=np.random.default_rng()

...
self.data_index=np.arange(self.dataset.size()[0])

Then define a shuffler class you can call whenever you want to shuffle the index:

def shuffler(self):
    rng.shuffle(self.data_index)

We need to define a batch_size, batch_idx, and max_idx in the init:

self.batch_size=batch_size
self.max_idx=self.dataset.size()[0]//self.batch_size
self.batch_idx=0

Now we can define a get_batch function:

def get_batch(self):
    if self.batch_idx==self.max_idx-1:
        batch=self.dataset[self.indexer[self.batch_idx*self.batch_size:],...]
        self.batch_idx=0
    else:
        batch=self.dataset[self.indexer[self.batch_idx*self.batch_size:self.batch_idx*self.batch_size+self.batch_size],...]
        self.batch_idx+=1

    return batch

That should get you started, at least.

For an additional speed boost, you can load the dataset to a spare GPU and load/process from there.

1 Like

Thank you for the suggestion! Here are some concerns I have for self-defined dataset:

  1. Since I am using DDP for multi-gpu training, I need to do extra cares for the data sampling to ensure non-overlapping behaviour among processes.
  2. Since I am doing a computer vision task and have a relatively-heavy data transformation, would it slow down the loading time if I do not use the num_workers feature?

Really appreciate your helps!

Ideally, you’d move any preprocessing that cannot be done in advance to the GPU. Dataset in RAM and preprocessing on the GPU should get you good GPU utilization without multiple workers.

Best regards

Thomas

1 Like

For instance, in the get_batch, you could do:

def get_batch(self):
    if self.batch_idx==self.max_idx-1:
        batch=self.dataset[self.indexer[self.batch_idx*self.batch_size:],...]
        self.batch_idx=0
    else:
        batch=self.dataset[self.indexer[self.batch_idx*self.batch_size:self.batch_idx*self.batch_size+self.batch_size],...]
        self.batch_idx+=1

    batch=self.process_batch(batch.to(device)) #send it to the GPU for faster processing
    return batch

And then you can define your transforms inside of that definition:

def process_batch(self, data):
    data = data / 255. #normalize images
    #other transforms
    return data

Of course, all of the above would be in a class function.

1 Like

By the way, you might need to tweak some things. Wrote this at a cafe on my phone while waiting for lunch. So might have to deal with the case when batch_size is a factor of the dataset.size()[0] via an if statement, or you may get an empty final batch.

1 Like

That is to say, under the DDP context, I have to initialise the dataset before mp.spawn(), otherwise, there will be two copies of the data in RAM, right?

If you’ve loading all of the data into RAM (or a GPU), there is no need for mp.spawn(). Pytorch tensors are handled asynchronously whether on cpu or gpu. You could del raw_loaded_data after you’ve placed it into a dataset tensor. Or you might use mp.spawn() for the initial loading process.

This is with cv2 and NumPy:

import cv2

dataset=torch.empty((0, image_width, image_height, channels))

file_path='files/images/'

def load_images(dataset, file_path):
    for file in os.listdir(file_path):
        image = cv2.imread(os.path.join(file_path, file))
        # use this spot to remove bad images, for example:
        if image.shape is None:
            continue
        # use this spot to resize/crop images, if they are not already the same size
        dataset = image2tensor(dataset, image)
        return dataset

def image2tensor(datset, image):
    return torch.cat([dataset, torch.from_numpy(image).unsqueeze(0)])

dataset = load_images(dataset, file_path)

That’s a simple synchronous way to build the initial dataset tensor. In this example, each image is getting temporarily loaded and so shouldn’t cause any RAM issues, but you could try del image if you want to be safe.

1 Like

Thank you for your patience and reply! However, I am still a little confused about why you say I do not need mp.spawn(), isn’t that a necessary part for DDP? I find a standard DDP training codes here:

import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os


def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)

class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int,
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)


def load_train_objs():
    train_set = MyTrainDataset(2048)  # load your dataset
    model = torch.nn.Linear(20, 1)  # load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )


def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=32, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()
    
    world_size = torch.cuda.device_count()
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)

In this case, do I have to setup dataset dataset = load_images(dataset, file_path) before mp.spawn() to ensure single copy of dataset in the RAM?

I see what you mean. Yes, you should use spawn afterward.