Dataloader fails when using num_worker>0 in multiprocessing on ubuntu

Hi All,
I’m facing this strange issue. I’m trying to make my CNN (PINet - A lane detection CNN) compatible with (DistrubutedDataParallel) distributed training.

My problem:

  • The data loader fails when I use num_worker>0 and spawn my script from torch.multiprocessing.spawn().

  • Without multiprocessing, I do not have any issue with num_worker being > 0.

Some info on my set up:
I have one node, 2 GPU machine.

Some info on essential config:
batch-size: 6
num-worker:2

Here is the complete stack trace.

Traceback (most recent call last):
  File "/snap/pycharm-community/233/plugins/python-ce/helpers/pydev/pydevd.py", line 1477, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/snap/pycharm-community/233/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/home/mvish7/PINet/core/train.py", line 222, in <module>
    mp.spawn(training, nprocs=world_size, join=True)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 200, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 158, in start_processes
    while not context.join():
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 119, in join
    raise Exception(msg)
Exception: 

-- Process 0 terminated with the following error:
Traceback (most recent call last):
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
    fn(i, *args)
  File "/home/mvish7/PINet/core/train.py", line 127, in training
    for t_batch, sample in enumerate(train_loader):
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 291, in __iter__
    return _MultiProcessingDataLoaderIter(self)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 737, in __init__
    w.start()
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/process.py", line 105, in start
    self._popen = self._Popen(self)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: 'NoneType' object is not callable

Here is a DEMO code (not sufficient to reproduce error – apologies as I can’t make some part of code public)


def find_free_port():
    """ https://stackoverflow.com/questions/1365265/on-localhost-how-do-i-pick-a-free-port-number """

    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return str(s.getsockname()[1])


def init_process(rank, world_size):
    # IP address of machine on which process 0 is located
    # free port on the machine on which process 0 is located
    os.environ['MASTER_ADDR'] = '192.168.178.26'
    os.environ['MASTER_PORT'] = find_free_port()
    dist.init_process_group(
        backend="nccl", init_method='env://', world_size=world_size, rank=rank)
    dist.barrier()

def setup_train_loader(train_dataset, train_sampler, world_size, rank):

    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=int(cfg.batch_size/world_size),
        num_workers=cfg.num_worker,
        shuffle=(train_sampler is None),
        sampler=train_sampler,
        drop_last=True,
        pin_memory=True,
    )
    return train_loader

def setup_val_loader(val_dataset, val_sampler, world_size, rank):
    val_loader = torch.utils.data.DataLoader(
        val_dataset,
        batch_size=int(cfg.batch_size/world_size),
        num_workers= cfg.num_worker,
        shuffle=(val_sampler is None),
        sampler=val_sampler,
        drop_last=True,
        pin_memory=True,
    )
    return val_loader

def train(rank):

    world_size = torch.cuda.device_count()

    init_process(rank, world_size)
    if dist.is_initialized():
        print(f"Rank {rank + 1}/{world_size} process initialized.\n")
    else:
        sys.exit()

    torch.manual_seed(0)
    torch.cuda.set_device(rank)

    print('Getting dataset')
    train_dataset = Generator(cfg, mode='Train')
    val_dataset = Generator(cfg, mode='Validate')

    # setting up model
    Model = PINet()
    # setting up optimizer
    # setting up scheduler

    print('Setting up dataloader')
    if dist.is_initialized():
        train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    else:
        train_sampler = None

    train_loader = setup_train_loader(train_dataset, train_sampler, world_size, rank)

    if dist.is_initialized():
        val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
    else:
        val_sampler = None

    val_loader = setup_val_loader(val_dataset, val_sampler, world_size, rank)

    start_time = time.time()
    for epoch in range(cfg.n_epoch):
        Model.training_mode()
        if dist.is_initialized():
            train_sampler.set_epoch(epoch)

            for t_batch, sample in enumerate(train_loader):
                imgs = sample['imgs']
                labels = sample['labels']
                # regular training loop



if __name__ == '__main__':
    # training()
    # considering 1 machine and N GPUs. for multiple machines and multiple GPUs, this process gets complicated.
    if torch.cuda.is_available():
        world_size = torch.cuda.device_count()
        mp.spawn(training, nprocs=world_size, join=True)

The configurations I’m using:

Ubuntu 18.04
CUDA 10.02
Pytorch 1.6.0
Pycharm 2020.0.3

Can someone guide me here??

Best,
mvish7

1 Like

@VitalyFedyunin for dataloader questions.

1 Like

Hi @VitalyFedyunin
Could you please have a look at this issue??

Hi,

I managed to solve the problem. I still don’t know what went wrong with the torch data loader internally but here is what worked for me.

When I was creating dataset class instance, I was doing something like this

train_dataset = Generator(cfg, mode='Train')

In the init() function of Generator class, i was doing something like this

self.cfg = cfg
self.actual_batchsize = None
self.mode = mode
self.dataset_size = None
self.train_data = []
self.test_data = []
self.val_data = []
self.process_data = ProcessData(cfg)

Where ‘cfg’ was an instance of a class holding some configurations and ‘ProcessData(cfg)’ was also creating an instance of a class.

When I removed ‘cfg’ and ‘ProcessData’ class instances from init_function(), the data loader started working fine even with num_worker>0 and torch multiprocessing.

I dont understand this behavior and I would like to know what is the root cause of this problem?

best,
mvish7

I am not sure about the implementation of your ProcessData. But, based on the traceback and code, I suspect some objects in either cfg or process_data has some. problem with pickling by Python.
In the DataLoader, we pass dataset into worker_loop to fetch data.

Since the Error happens when kicking off worker, I suspect there are something within your process_data can not be pickled.

To verify the problem, can you try to pickle dump one process_data object and see whether there is a problem.