Hi, I am trying to use DistributedDataParallel for my job. I wrote two following codes but none of them is working properly. It would be great kind if someone helps me to find the problem.
class Model(nn.Module):
# Our model
def __init__(self):
super(Model, self).__init__()
self.fc1 = nn.Conv2d(1,10,3)
self.bn1 = nn.BatchNorm2d(10)
self.fc2= nn.Conv2d(10,20,3)
self.bn2 = nn.BatchNorm2d(20)
self.fc3= nn.Linear(11520,10)
def forward(self,x):
print(f'inout_size: {x.size()}')
x = F.relu(self.fc1(x))
x = self.bn1(x)
x = F.relu(self.fc2(x))
x = self.bn2(x)
x = x.view(x.size(0),-1)
x = self.fc3(x)
print(f'output_size: {x.size()}')
return(x)
########################################
def train(args):
########################################
rank =args.gpui
dist.init_process_group(backed = 'nccl',
init_method = 'env://',
world_size= args.world_size,
rank=rank)
torch.manual_seed(0)
model = Model()
torch.cuda.set_device(args.gpui)
model= model.to(device)
optimizer = optim.Adam(model.parameters(),lr=0.1)
lr_sch = lr_scheduler.StepLR(optimizer,step_size=2,gamma=0.1)
criterion = nn.CrossEntropyLoss().to(device)
######################################
model = nn.DistributedDataParallel(model, device_ids = [args.gpui])
#####################################
mnist =torchvision.datasets.MNIST('./data',train= True,download=True,
transform =transforms.ToTensor())
####################################
train_sampler = torch.utils.data.distributed.DistributedSampler(mnist,
num_replicas=args.world_size,
rank = rank)
###################################
dataloader = DataLoader(mnist,batch_size=32,num_workers =4,pin_memory=True,
sampler = train_sampler)
#####################################
for epoch in range(num_epochs):
total_loss =0
for X,y in dataloader:
X= X.to(device)
y = y.long().to(device)
pred = model(X)
loss = criterion(pred,y)
t_loss+= loss.item()
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Loss: {t_loss/len(dataloader)}')
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--nodes', default=1,
type=int, metavar='N')
parser.add_argument('-g', '--gpus', default=1, type=int,
help='number of gpus per node')
parser.add_argument('-gi', '--gpui', default=3, type=int,
help='the index of gpu')
parser.add_argument('-nr', '--nr', default=0, type=int,
help='ranking within the nodes')
parser.add_argument('--epochs', default=2, type=int,
metavar='N',
help='number of total epochs to run')
args = parser.parse_args()
#########################################################
args.world_size = args.gpus * args.nodes # it is equal to the total number of gpus, because we use each gpu per node
os.environ['MASTER_ADDR'] = '172.20.24.55' # it tells which IP address it should look for process 0
os.environ['MASTER_PORT'] = '8890' #
mp.spawn(train,args=(args,),nprocs=args.world_size) #
I got the following error,
--> 125 mp.spawn(train,args=(args,),nprocs=args.world_size) #
126 #########################################################
127
~/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon, start_method)
198 ' torch.multiprocessing.start_process(...)' % start_method)
199 warnings.warn(msg)
--> 200 return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
~/anaconda3/lib/python3.7/site-packages/torch/multiprocessing/spawn.py in start_processes(fn, args, nprocs, join, daemon, start_method)
147 daemon=daemon,
148 )
--> 149 process.start()
150 error_queues.append(error_queue)
151 processes.append(process)
~/anaconda3/lib/python3.7/multiprocessing/process.py in start(self)
110 'daemonic processes are not allowed to have children'
111 _cleanup()
--> 112 self._popen = self._Popen(self)
113 self._sentinel = self._popen.sentinel
114 # Avoid a refcycle if the target function holds an indirect
~/anaconda3/lib/python3.7/multiprocessing/context.py in _Popen(process_obj)
282 def _Popen(process_obj):
283 from .popen_spawn_posix import Popen
--> 284 return Popen(process_obj)
285
286 class ForkServerProcess(process.BaseProcess):
~/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py in __init__(self, process_obj)
30 def __init__(self, process_obj):
31 self._fds = []
---> 32 super().__init__(process_obj)
33
34 def duplicate_for_child(self, fd):
~/anaconda3/lib/python3.7/multiprocessing/popen_fork.py in __init__(self, process_obj)
18 self.returncode = None
19 self.finalizer = None
---> 20 self._launch(process_obj)
21
22 def duplicate_for_child(self, fd):
~/anaconda3/lib/python3.7/multiprocessing/popen_spawn_posix.py in _launch(self, process_obj)
40 tracker_fd = semaphore_tracker.getfd()
41 self._fds.append(tracker_fd)
---> 42 prep_data = spawn.get_preparation_data(process_obj._name)
43 fp = io.BytesIO()
44 set_spawning_popen(self)
~/anaconda3/lib/python3.7/multiprocessing/spawn.py in get_preparation_data(name)
170 # or through direct execution (or to leave it alone entirely)
171 main_module = sys.modules['__main__']
--> 172 main_mod_name = getattr(main_module.__spec__, "name", None)
173 if main_mod_name is not None:
174 d['init_main_from_name'] = main_mod_name
AttributeError: module '__main__' has no attribute '__spec__'