DataLoader, when num_worker >0, there is bug

It seems you are facing different issues at the same time.
If I understand the post correctly, you are seeing:

  • Caffe2 warning → ignore it, as it was disabled and will have no effect
  • BrokenPipeError when writing to a file → check if multiple processes are writing to the file
  • accuracy drop when using pin_memory=True → could you post a code snippet to reproduce this issue?
import torch
from utils.torch_utils import init_seed

from datasets.get_dataset import get_dataset
from models.get_model import get_model
from losses.get_loss import get_loss
from trainer.get_trainer import get_trainer


def main(cfg, _log):
    init_seed(cfg.seed)

    _log.info("=> fetching img pairs.")
    train_set, valid_set = get_dataset(cfg)

    _log.info('{} samples found, {} train samples and {} test samples '.format(
        len(valid_set) + len(train_set),
        len(train_set),
        len(valid_set)))

    train_loader = torch.utils.data.DataLoader(
        train_set, batch_size=cfg.train.batch_size,
        num_workers=cfg.train.workers, pin_memory=True, shuffle=True)

    max_test_batch = 4
    if type(valid_set) is torch.utils.data.ConcatDataset:
        valid_loader = [torch.utils.data.DataLoader(
            s, batch_size=min(max_test_batch, cfg.train.batch_size),
            num_workers=min(4, cfg.train.workers),
            pin_memory=True, shuffle=False) for s in valid_set.datasets]
        valid_size = sum([len(l) for l in valid_loader])
    else:
        valid_loader = torch.utils.data.DataLoader(
            valid_set, batch_size=min(max_test_batch, cfg.train.batch_size),
            num_workers=min(4, cfg.train.workers),
            pin_memory=True, shuffle=False)
        valid_size = len(valid_loader)

    if cfg.train.epoch_size == 0:
        cfg.train.epoch_size = len(train_loader)
    if cfg.train.valid_size == 0:
        cfg.train.valid_size = valid_size
    cfg.train.epoch_size = min(cfg.train.epoch_size, len(train_loader))
    cfg.train.valid_size = min(cfg.train.valid_size, valid_size)

    model = get_model(cfg.model)
    loss = get_loss(cfg.loss)
    trainer = get_trainer(cfg.trainer)(
        train_loader, valid_loader, model, loss, _log, cfg.save_root, cfg.train)

    trainer.train()

when I changed the pin_memory=False the accuracy dropped,
I set pin_memory=False ,and next I tried to set workers=0 but the training become too slow
I thought that these two variables cause the errors and warnings so that why I am trying to change them

Your code is unfortunately not executable, so we won’t be able to reproduce it.
Could you try to use random tensors and check, if you are still observing the accuracy drop?

how to check please ? the code was working fine with the old torch version 1.1 so is the new torch version is the reason for this error ?

Hey! Sorry to revive such an old topic but recently I’ve been redoing my implementation of a project to try to squeeze out the max amount of performance out of it and decided to use h5 file for datasets.
In your recommendations you mention to use a DataLoader with a batch_sampler because random access to hdf5 files is slow. What would be the difference between using a custom batch_sampler to grab random samples, and just using the default sampler with shuffle=True?

For context, this is my H5Dataset implementation:

class H5Dataset(torch.utils.data.Dataset):
    def __init__(self, path, split, mode):
        self.file_path = path
        self.dataset = None
        self.split = split
        self.mode = mode
        with h5py.File(self.file_path, 'r') as file:
            if self.split == "pixel_values":
                self.dataset_len = len(file[self.split])
            else:
                assert len(file[self.split]["img_id"]) == len(file[self.split]["category"]) == len(
                    file[self.split]["category"]) == len(file[self.split]["attention_mask"]) == len(file[self.split]["input_ids"]), "non matching number of entries in .h5 file."
                self.dataset_len = len(file[self.split]["img_id"])
            self.categories = [category.decode("utf-8") for category in np.unique(file[self.split]["category"])]
    
    def __getitem__(self, idx):
        if self.dataset is None:
            self.dataset = h5py.File(self.file_path, 'r')
        output = {}
        output["attention_mask"] = self.dataset[self.split + "/attention_mask"][idx]
        output["category"] = self.dataset[self.split + "/category"][idx].decode("utf-8")
        output["img_id"] = self.dataset[self.split + "/img_id"][idx]
        output["input_ids"] = self.dataset[self.split + "/input_ids"][idx]
        output["label"] = self.dataset[self.split + "/label"][idx]
        if self.mode == "baseline":
            output["pixel_values"] = self.dataset["pixel_values"][output["img_id"]][4]
        elif self.mode == "patching":
            output["pixel_values"] = self.dataset["pixel_values"][output["img_id"]]
        return output

    def __len__(self):
        return self.dataset_len

and in my Trainer implementation I keep the dataset related stuff (Datasets and DataLoaders) as follows:

self.train_dataset = H5Dataset(os.path.join("datasets", "dataset_folder", "dataset_file.h5"), "train", model)
self.train_loader = torch.utils.data.DataLoader(self.train_dataset, batch_size=self.batch_size,shuffle=True, num_workers=4)
self.validation_dataset = H5Dataset(os.path.join("datasets", "dataset_folder", "dataset_file.h5"), "validation", model)
self.validation_loader = torch.utils.data.DataLoader(self.validation_dataset, batch_size=self.batch_size,shuffle=False, num_workers=4)
self.test_dataset = H5Dataset(os.path.join("datasets", "dataset_folder", "dataset_file.h5"), "test", model)
self.test_loader = torch.utils.data.DataLoader(self.test_dataset, batch_size=self.batch_size,
                                                        shuffle=False, num_workers=4)

Any tips would be greatly appreciated as I am quite new to this :slight_smile:

Update for anyone in 2023. I don’t think you need to open the file pointer in __getitem__ anymore. The following code seems to work fine:

import h5py
import numpy as np
from torch.utils.data import DataLoader, Dataset


class H5Dataset(Dataset):
    def __init__(self, h5_path):
        self.h5_file = h5py.File(h5_path, "r")

    def __getitem__(self, index):
        return (
            self.h5_file["data"][index],
            self.h5_file["target"][index],
        )

    def __len__(self):
        return self.h5_file["target"].size


# --
# Make data
with h5py.File("test.h5", "w") as f:
    f.create_dataset("data", data=np.random.uniform(0, 1, (256, 1024, 1024)))
    f.create_dataset("target", data=np.random.choice(1000, size=256))


# Runs correctly
dataloader = DataLoader(H5Dataset("test.h5"), batch_size=32, num_workers=0, shuffle=True)

for i, (data, target) in enumerate(dataloader):
    print(data.shape)
    if i > 10:
        break

# also runs correctly
dataloader = DataLoader(H5Dataset("test.h5"), batch_size=32, num_workers=8, shuffle=True)

for i, (data, target) in enumerate(dataloader):
    print(data.shape)
    if i > 10:
        break
2 Likes

Hi,
I also need some guidance to better design my dataset class.

import torch
import h5py
from torch.utils import data


class HDF5Dataset(data.Dataset):
    def __init__(self, file_list):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.file_list = file_list
        self.h5_files = None

    def setup(self):
        self.h5_files = [h5py.File(f, 'r') for f in self.file_list]

    def __getitem__(self, index):
        if self.h5_files is None:
            self.setup()
        file = self.h5_files[index]
        data = file["data_id"][()]  # faster than slicing
        return (torch.tensor(data1:, :4000, 2:258], device=self.device),
                torch.tensor(data0, :4000, 2:258], device=self.device).unsqueeze(0))

    def __len__(self):
        return len(self.file_list)


dataset = HDF5Dataset(file_list)

training_set, validation_set = (data.random_split(dataset, [0.9, 0.1]))


train_loader = data.DataLoader(training_set, batch_size=2,
                               shuffle=False, drop_last=False, num_workers=num_workers,
                               pin_memory=pin_memory)
validation_loader = data.DataLoader(validation_set, batch_size=2,
                                    shuffle=False, drop_last=False, num_workers=num_workers,
                                    pin_memory=pin_memory)

It takes me approximately 7 minutes to perform one iteration over the training and validation set with in total around 1000 H5 files, and the actual model training/evaluation is taking only 5% of the time. I could not observe any significant speedup between num_workers=0 and a larger number. Same for a larger batch size.

As I am new here, please let me know if you need further information from my side.

I wonder if this should be equivalent to setting

self.dataset = h5py.File(self.file_path, 'r')["dataset"]

directly in __init__? Why would it be necessary to initialize h5py.File when it is first used in __getitem__?