Strange error when parsing JSON in multiple worker data loader

Hi @ptrblck

actually I am following the example here
https://pytorch.org/docs/stable/notes/multiprocessing.html


import torch.multiprocessing as mp
from model import MyModel

def train(model):
    # Construct data_loader, 
   DataLoader(test_dataset, num_workers=8, batch_size=1)

    for data, labels in data_loader:
        optimizer.zero_grad()
        loss_fn(model(data), labels).backward()
        optimizer.step()  # This will update the shared parameters

if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    # NOTE: this is required for the ``fork`` method to work
    model.share_memory()
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

we can see we have multi processing in main, and dataloader in train(model), in this case, should we avoid to use multi worker data loader under multi processing? otherwise 4 processing 's 8 worker dataloader will load same data 4 times?

The Hogwild example will share the model and based on your code snippet will reuse the dataset in each process.
This blog post might be useful, as it’s implementing the DistributedSampler, which would only load a partition in each process.

Not necessarily, if you don’t see any performance drops or any other issues (such as the JSON issue).

I think this would be the case in your current example code, so I would suggest to use a DistributedSampler as given in the linked blog post.

Thanks, @ptrblck , I think the DistributedSampler looks like a good fit, what I am not sure is here if it use num_process as num_replicas, will it just randomly split data set into num_processes, so each process still have some chance to process same data due to randomness or miss process some dadta? or ensure each process process none overlapping data?

            sampler=DistributedSampler(
                dataset=dataset,
                num_replicas=num_processes,
                rank=rank
            ),

The DistributedSampler might add a few samples (attaching the first samples to the end) to make the length evenly divisible as seen in this line of code. Besides this potential repetition, there wouldn’t be an overlap and the indices would be split here.

Thanks, so in this case, if I have M num_processes, and each process with a dataloader of N workers. this data will be split into NxM slices, and __get_item__ in dataloader will still have index to go over its slices out of NxM?

I don’t understand the “slice” notation. For M “outer processes”, the sampler will split the indices into M chunks and each process will only load the data using its current indices.
Each worker in the DataLoaders will create a batch by passing the index to Dataset.__getitem__ and calling into the collate_fn to create the final batch of all samples.

Thanks, @ptrblck, slices I mean a subset of full data, suppose I have list [1,2,...32] dataset, so I have 2 outer process, each process with a 4 worker data loader, so dose the data split into sth like

  • process 1 will load
[1,2,3,4]
...
[13,14,15,16]
  • process 2 will load
[17,18,19,20]
...
[29,30,31,32]

so in each process, I have 4 worker data loader, so let’s see one data loader in process1 will only load
[1,2,3,4]?

in this case, the Dataset.__get_item__ will have index from 0-3? corresponding to data in [1,2,3,4]?

By default the __getitem__ will get a single index, so you should implement the logic in this function to load and process only a single sample.
There are ways using a BatchSampler to provide multiple indices, but that’s a special case and it doesn’t seem that it would fit your use case.

Each process will load a subset of the data, yes that’s correct.

Each worker will create a batch using some indices, which are provided.
The first worker in this particular process might load samples [1, 2, 3, 4] sequentially and create the batch using these samples.

Thanks, Yes I want to only load one item in slice, in fact in my system I have 20K+ training data file, so each __get_item__ will return one file. so my batch_size is number of files loaded in each item, and it should be only 1.

I am using same way to do, but seems not working as I expect. let me update thread after get some more investigation

Hi @ptrblck, I have made it work via distributed sampling. Thanks for help. one question is since different processing has pick up different subset of data, how these process update same model without race condition? or this is sth we don’t need to worry? since right now I am still using one GPU, so this multi processing just speed up the data processing, each process will process subset of data, convert to tensor, and then all tensor is queued to one GPU, so only one model trained in one GPU and we don’t need to worry race condition?

DistributedDataParallel will take care of all necessary synchronizations and communications, so that you don’t need to worry about race conditions.
However, if you are referring to the Hogwild training: the original Hogwild paper claims that the race conditions don’t hurt the performance and are expected.

HOGWILD paper

We present an update scheme called HOGWILD! which allows processors access to shared memory with the possibility of overwriting each other’s work. We show that when the associated optimization problem is sparse, meaning most gradient updates only modify small parts of the decision variable, then HOGWILD! achieves a nearly optimal rate of convergence. We demonstrate experimentally that HOGWILD! outperforms alternative schemes that use locking by an order of magnitude.

Thanks @ptrblck

In general, there seems two ways to share model/tensor, one is via DataParallel (assuming I don’t have multi node, only one node with 4 GPUs). in this case, we can leverage multiple GPUs but maybe some unbalanced GPU usage happen, our model is small so I guess that won’t happen much. In this case, can I still use DistributedDataParallel even I only have one node, not really distributed way? and it seems we can benefit from synchronizations and communications DistributedDataParallel use.

Another way is the Hogwild way you mentioned, this is current method we are using, this is via shared memory. for a single node case, are shared memory more efficient than DataParallel or DistributedDataParallel? or let’s see Hogwild way has some model performance impact, can we run DistributedDataParallel even in a single node and single GPU case, and give us benefit where we can have multi processing to speed up some CPU job and feed tensor to GPU as fast as possible?

Yes, and we recommend using DistributedDataParallel even on a single node with multiple GPU, where each process will use a single device, as it should be the fastest parallel approach.

I personally never used Hogwild, so cannot comment on the performance (speed and accuracy) of this approach.

Hi I have the same problem when I am running my code with num_worker >0 …
I am running my code in cluster node with 4 GPU and 360 GB memory. How can I fix this problem? When I replaced num_worker with pin_memory=True, it works but very very slow.

any help would be appreciated

A Segmentation fault points towards a memory access violation and the worker is thus dying.
It could be an issue with e.g. the data loading itself (via a 3rd party library) and we would need to get more information to help more.

What do you mean by third party library?
this is the class for dataset…

class DatasetCSV(Dataset):
def init(self, csv_file, transform=None):
“”“Initializes instance of class
Args:
csv_file (str): Path to the csv file with the data.
“””
self.df = pd.read_csv(csv_file)
self.X = self.df.drop(“class”, axis=1)
self.y = self.df[“class”]
self.transform = transform

def __len__(self):
    return len(self.df)

def __getitem__(self, idx):
    # Convert idx from tensor to list due to pandas bug (that arises when using pytorch's random_split)
    if isinstance(idx, torch.Tensor):
        idx = idx.tolist()

    localX = self.X.iloc[idx].values
    localY = self.y[idx]    
    # print(type(localX), type(localY))
    # print(type(localY))
    if self.transform:
        localX = torch.from_numpy(localX)
        localY = torch.tensor(localY)

    return [localX, localY]

def train(self, max_epoch=50, batch_size=500, test_interval=10, num_workers=4):
# Data Load
train_set = DatasetCSV(“data/train.csv”, transform=True)

The 3rd party library would be pandas in this case.
Try to update all libs to the latest release and feel free to post an executable code snippet to reproduce this issue in case you get stuck.