Multiple DataLoaders with DistributedDataParallel can't find default process group

I’m currently trying to run an NLP model using DistributedDataParallel and I’ve been receiving the following error if I use more than one worker for DataLoader (this error appears for each worker process):

Traceback (most recent call last):
  File "<string>", line 1 in <module>
  File "/opt/conda/lib/python3.6/multiprocessing/spawn.py", line 105, in spawn_main
    exitcode = _main(df)
  File "/opt/conda/lib/python3.6/multiprocessing/spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
  File "/opt/conda/lib/site-packages/torch/nn/parallel/distributed.py", line 396, in __setstate__
    self.process_group = _get_default_group()
  File "/opt/conda/lib/site-packages/torch/distributed/distributed_c10d.py", line 286, in _get_default_group
    raise RuntimeError("Default process group has not been initialized, "
Default process group has not been initialized, please make sure to call init_process_group.

In main() that I call after torch.multiprocessing.spawn(), I use the following call:

dist.init_process_group("nccl", rank=rank, world_size=args.gpu, init_method="file:///app/tmp/sharedfile")

I don’t receive this error if I set the number of workers to 0. I still receive this error if I change init_method to env:// (and I have the port and address variables set). I would like this to work in file mode though, since I can’t change the size of /dev/shm.

The error itself seems to trigger when I start iterating through dataloader for my epoch (which means I don’t begin a single training loop before the error).

I’m using 4 GPUs on a single node centos docker image with pytorch 1.4.0 and python 3.6.9. Let me know if you need further info, appreciate any tips!

Hey @claracurrier, could you please share a code snippet?

Did you do sth like the following? If so, the default process group is a per-process state, so you will need to call init_process_group in the beginning of the spawn target function (i.e., target_fn), not after spawn in main function.

def main():
    spawn(target_fn, args=(...))
    init_process_group(....)

Sorry, here’s more code, I’m following guides from tutorials and the documentation.

def main(rank, args):
  dist.init_process_group(...)
  # ... load data ...
  train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset.lengths(), 
    num_replicas = args.gpus, 
    rank = rank, 
    shuffle = True
  )
  train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size = args.batch_size,
    num_workers = args.data_workers,
    collate_fn = mybatchifyfunc,
    pin_memory = True,
    drop_last = True,
    shuffle = False
  )

  # ... machine learning ...

if __name__ == "__main__":
  # ... set up logging, parse args...
  torch.multiprocessing.spawn(main, args=(args,), nprocs=args.gpus, join=True)

@mrshenli Sorry to bump, but I’m still not able to figure out the error - I’ve been cross-checking with examples but I’m following them as far as I can see. If the cause of the error is outside of distributed, I’m not able to tell because the error is thrown on spawn.

Hey @claracurrier

The above code looks correct to me. Is it possible to share a repro so that I can help debug locally?

Regarding the original error message you posted, looks like the program is trying to pass a DistributedDataParallel object through the spawn args, and hence the unpickle triggered the error. What’s in the args=(args,) when you call spawn?

  File "/opt/conda/lib/python3.6/multiprocessing/spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
  File "/opt/conda/lib/site-packages/torch/nn/parallel/distributed.py", line 396, in __setstate__
    self.process_group = _get_default_group()

Thanks for the quick reply @mrshenli

For args, it’s a Namespace object from argparse that contains all my ML parameters. It is fairly long, but it only contains ints, floats, str, lists, and bools. The DistributedDataParallel object is not passed.

Unfortunately I’m working on a remote instance that makes copying and pasting difficult so it’ll take a little while to get a minimal reproduction. I’ll post here when I have it.

1 Like

@mrshenli in the course of building a barebones repro, I discovered the source of the error: I was passing something unpicklable into my Dataset instance (I was passing my model instead of the args by accident) such that it looked like this:

def main(rank, args):
  dist.init_process_group(...)
  dummy_train = []
  model = dummyModel(args)
  model.parallelize(rank)

  train_dataset = MyNLPDataset(dummy_train, args)  
  # replace args with something unpicklable to trigger error

  train_sampler = torch.utils.data.distributed.DistributedSampler(...)
  train_loader = torch.utils.data.DataLoader(train_dataset ...)
  
  for epoch in range(args.num_epochs):
    for training_ex in train_loader:
      output = model.update(training_ex)

Where the dataset class was:

class MyNLPDataset(torch.utils.data.Dataset):
  def __init__(self, examples, args):
    self.examples = examples
    self.args = args # <-- previously this was saving a copy of the model object

# ... overrided methods ...

I think this could’ve been easier if I had a better error message - your comment about something being unpicklable helped me narrow my search. The error ultimately didn’t involve the process group.

I’m now getting a new NCCL backend error that doesn’t affect GLOO, so I’m back to hunting for new issues. Thanks so much for your help!

1 Like