Hi All,
I’m facing this strange issue. I’m trying to make my CNN (PINet - A lane detection CNN) compatible with (DistrubutedDataParallel) distributed training.
My problem:
-
The data loader fails when I use num_worker>0 and spawn my script from torch.multiprocessing.spawn().
-
Without multiprocessing, I do not have any issue with num_worker being > 0.
Some info on my set up:
I have one node, 2 GPU machine.
Some info on essential config:
batch-size: 6
num-worker:2
Here is the complete stack trace.
Traceback (most recent call last):
File "/snap/pycharm-community/233/plugins/python-ce/helpers/pydev/pydevd.py", line 1477, in _exec
pydev_imports.execfile(file, globals, locals) # execute the script
File "/snap/pycharm-community/233/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/mvish7/PINet/core/train.py", line 222, in <module>
mp.spawn(training, nprocs=world_size, join=True)
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 200, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 158, in start_processes
while not context.join():
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 119, in join
raise Exception(msg)
Exception:
-- Process 0 terminated with the following error:
Traceback (most recent call last):
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/multiprocessing/spawn.py", line 20, in _wrap
fn(i, *args)
File "/home/mvish7/PINet/core/train.py", line 127, in training
for t_batch, sample in enumerate(train_loader):
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 291, in __iter__
return _MultiProcessingDataLoaderIter(self)
File "/home/anaconda3/envs/cam_section/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 737, in __init__
w.start()
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/home/anaconda3/envs/cam_section/lib/python3.6/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: 'NoneType' object is not callable
Here is a DEMO code (not sufficient to reproduce error – apologies as I can’t make some part of code public)
def find_free_port():
""" https://stackoverflow.com/questions/1365265/on-localhost-how-do-i-pick-a-free-port-number """
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return str(s.getsockname()[1])
def init_process(rank, world_size):
# IP address of machine on which process 0 is located
# free port on the machine on which process 0 is located
os.environ['MASTER_ADDR'] = '192.168.178.26'
os.environ['MASTER_PORT'] = find_free_port()
dist.init_process_group(
backend="nccl", init_method='env://', world_size=world_size, rank=rank)
dist.barrier()
def setup_train_loader(train_dataset, train_sampler, world_size, rank):
train_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=int(cfg.batch_size/world_size),
num_workers=cfg.num_worker,
shuffle=(train_sampler is None),
sampler=train_sampler,
drop_last=True,
pin_memory=True,
)
return train_loader
def setup_val_loader(val_dataset, val_sampler, world_size, rank):
val_loader = torch.utils.data.DataLoader(
val_dataset,
batch_size=int(cfg.batch_size/world_size),
num_workers= cfg.num_worker,
shuffle=(val_sampler is None),
sampler=val_sampler,
drop_last=True,
pin_memory=True,
)
return val_loader
def train(rank):
world_size = torch.cuda.device_count()
init_process(rank, world_size)
if dist.is_initialized():
print(f"Rank {rank + 1}/{world_size} process initialized.\n")
else:
sys.exit()
torch.manual_seed(0)
torch.cuda.set_device(rank)
print('Getting dataset')
train_dataset = Generator(cfg, mode='Train')
val_dataset = Generator(cfg, mode='Validate')
# setting up model
Model = PINet()
# setting up optimizer
# setting up scheduler
print('Setting up dataloader')
if dist.is_initialized():
train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
else:
train_sampler = None
train_loader = setup_train_loader(train_dataset, train_sampler, world_size, rank)
if dist.is_initialized():
val_sampler = DistributedSampler(val_dataset, num_replicas=world_size, rank=rank)
else:
val_sampler = None
val_loader = setup_val_loader(val_dataset, val_sampler, world_size, rank)
start_time = time.time()
for epoch in range(cfg.n_epoch):
Model.training_mode()
if dist.is_initialized():
train_sampler.set_epoch(epoch)
for t_batch, sample in enumerate(train_loader):
imgs = sample['imgs']
labels = sample['labels']
# regular training loop
if __name__ == '__main__':
# training()
# considering 1 machine and N GPUs. for multiple machines and multiple GPUs, this process gets complicated.
if torch.cuda.is_available():
world_size = torch.cuda.device_count()
mp.spawn(training, nprocs=world_size, join=True)
The configurations I’m using:
Ubuntu 18.04
CUDA 10.02
Pytorch 1.6.0
Pycharm 2020.0.3
Can someone guide me here??
Best,
mvish7