I am based on the POMO code POMO to change it to a single machine multi-GPU graphics card running mode
Let me explain the specific code logic:
First, each epoch has train_num_episode = self_TRAINer_params ['train_episodes']
data to run. Due to insufficient GPU memory to run at one time, Therefore, it runs in multiple batches (while part) batch_size = min(self_trainer_params ['train_batch_size'], remaining)
. However, even if it is run in multiple batches, the required video memory is still large and the running time is slow. So instead, it runs on multiple Gpus. Load the batch_size data, create the sampler and dataloader to divide the data, and then set the epoch. Finally, execute them one by one.
def _train_one_epoch(self,epoch):
score_AM = AverageMeter()
loss_AM = AverageMeter()
train_num_episode = self.trainer_params['train_episodes']
episode = 0
loop_cnt = 0
while episode < train_num_episode:
remaining = train_num_episode - episode
batch_size = min(self.trainer_params['train_batch_size'], remaining)
# load every epoch to DataLoader
dis_up, dis_down = self.env.load_problems(batch_size)
# (batch,node,node)->(batch,node,node,2)
batch_data = torch.stack([dis_up, dis_down], dim=-1)
single_batch_size = batch_size // 3
# create Dataloader
sampler = torch.utils.data.DistributedSampler(batch_data)
batch_dataloader = torch.utils.data.DataLoader(batch_data,batch_size = single_batch_size,shuffle=False,sampler=sampler)
sampler.set_epoch(epoch)
for batch_idx,batch in enumerate(batch_dataloader):
batch_up = batch[:,:,:,0].to(self.device)
batch_down = batch[:,:,:,1].to(self.device)
# avg_score, avg_loss = self._train_one_batch(batch_size)
current_gpu = torch.cuda.current_device()
avg_score, avg_loss = self._train_one_batch(batch_up, batch_down, current_gpu)
score_AM.update(avg_score, batch_size)
loss_AM.update(avg_loss, batch_size)
dist.barrier()
episode += batch_size
# Log First 10 Batch, only at the first epoch
if epoch == self.start_epoch:
loop_cnt += 1
if loop_cnt <= 10:
self.logger.info('Epoch {:3d}: Train {:3d}/{:3d}({:1.1f}%) Score: {:.4f}, Loss: {:.4f}'
.format(epoch, episode, train_num_episode, 100. * episode / train_num_episode,
score_AM.avg, loss_AM.avg))
# Log Once, for each epoch
self.logger.info('Epoch {:3d}: Train ({:3.0f}%) Score: {:.4f}, Loss: {:.4f}'
.format(epoch, 100. * episode / train_num_episode,
score_AM.avg, loss_AM.avg))
return score_AM.avg, loss_AM.avg
it occurs an error,But I don’t know how to solve it, can someone help me?
for batch_idx,batch in enumerate(batch_dataloader):
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 681, in __next__
for batch_idx,batch in enumerate(batch_dataloader):
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 681, in __next__
data = self._next_data()data = self._next_data()
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
data = self._next_data()
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
index = self._next_index() # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
index = self._next_index() # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
index = self._next_index() # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
return next(self._sampler_iter) # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
return next(self._sampler_iter) # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
return next(self._sampler_iter) # may raise StopIteration
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
for idx in self.sampler:
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
for idx in self.sampler:
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
for idx in self.sampler:
File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
RuntimeErrorRuntimeError: : Expected a 'cuda' device type for generator but found 'cpu'Expected a 'cuda' device type for generator but found 'cpu'
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 3773341) of binary: /home/.conda/envs/bin/python
the complete code implementation is as follows:
Trainer.py
import torch
from logging import getLogger
import torch.distributed as dist
from torch.utils.data import DataLoader, TensorDataset
from TSPEnv import TSPEnv as Env
from TSPModel import TSPModel as Model
from torch.optim import Adam as Optimizer
from torch.optim.lr_scheduler import MultiStepLR as Scheduler
from utils.utils import *
class TSPTrainer:
def __init__(self,
env_params,
model_params,
optimizer_params,
trainer_params):
# save arguments
self.env_params = env_params
self.model_params = model_params
self.optimizer_params = optimizer_params
self.trainer_params = trainer_params
# saved_models folder, logger
self.logger = getLogger(name='trainer')
self.result_folder = get_result_folder()
self.result_log = LogData()
#
torch.distributed.init_process_group(backend="nccl")
# device
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda",local_rank)
self.device = device
# cuda
# USE_CUDA = self.trainer_params['use_cuda']
# if USE_CUDA:
# cuda_device_num = self.trainer_params['cuda_device_num']
# torch.cuda.set_device('cuda:{}'.format(cuda_device_num[0]))
# device = torch.device('cuda', cuda_device_num[0])
# torch.set_default_tensor_type('torch.cuda.FloatTensor')
# else:
# device = torch.device('cpu')
# torch.set_default_tensor_type('torch.FloatTensor')
# Main Components
self.model = Model(**self.model_params)
self.model.to(device)
# self.model.pre_forward() = self.model.pre_forward().to(device)
self.model = torch.nn.parallel.DistributedDataParallel(self.model,device_ids=[local_rank],output_device=local_rank,find_unused_parameters=True)
self.env = Env(**self.env_params)
self.optimizer = Optimizer(self.model.parameters(), **self.optimizer_params['optimizer'])
self.scheduler = Scheduler(self.optimizer, **self.optimizer_params['scheduler'])
# Restore
self.start_epoch = 1
model_load = trainer_params['model_load']
if model_load['enable']:
checkpoint_fullname = '{path}/checkpoint-{epoch}.pt'.format(**model_load)
checkpoint = torch.load(checkpoint_fullname, map_location=device)
self.model.load_state_dict(checkpoint['model_state_dict'])
self.start_epoch = 1 + model_load['epoch']
self.result_log.set_raw_data(checkpoint['result_log'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
self.scheduler.last_epoch = model_load['epoch']-1
self.logger.info('Saved Model Loaded !!')
# utility
self.time_estimator = TimeEstimator()
def run(self):
self.time_estimator.reset(self.start_epoch)
for epoch in range(self.start_epoch, self.trainer_params['epochs']+1):
self.logger.info('=================================================================')
# LR Decay
self.scheduler.step()
# Train
train_score, train_loss = self._train_one_epoch(epoch)
self.result_log.append('train_score', epoch, train_score)
self.result_log.append('train_loss', epoch, train_loss)
############################
# Logs & Checkpoint
############################
elapsed_time_str, remain_time_str = self.time_estimator.get_est_string(epoch, self.trainer_params['epochs'])
self.logger.info("Epoch {:3d}/{:3d}: Time Est.: Elapsed[{}], Remain[{}]".format(
epoch, self.trainer_params['epochs'], elapsed_time_str, remain_time_str))
all_done = (epoch == self.trainer_params['epochs'])
model_save_interval = self.trainer_params['logging']['model_save_interval']
img_save_interval = self.trainer_params['logging']['img_save_interval']
if epoch > 1: # save latest images, every epoch
self.logger.info("Saving log_image")
image_prefix = '{}/latest'.format(self.result_folder)
util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_1'],
self.result_log, labels=['train_score'])
util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_2'],
self.result_log, labels=['train_loss'])
if all_done or (epoch % model_save_interval) == 0:
self.logger.info("Saving trained_model")
checkpoint_dict = {
'epoch': epoch,
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
'scheduler_state_dict': self.scheduler.state_dict(),
'result_log': self.result_log.get_raw_data()
}
torch.save(checkpoint_dict, '{}/checkpoint-{}.pt'.format(self.result_folder, epoch))
if all_done or (epoch % img_save_interval) == 0:
image_prefix = '{}/img/checkpoint-{}'.format(self.result_folder, epoch)
util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_1'],
self.result_log, labels=['train_score'])
util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_2'],
self.result_log, labels=['train_loss'])
if all_done:
self.logger.info(" *** Training Done *** ")
self.logger.info("Now, printing log array...")
util_print_log_array(self.logger, self.result_log)
def _train_one_epoch(self,epoch):
score_AM = AverageMeter()
loss_AM = AverageMeter()
train_num_episode = self.trainer_params['train_episodes']
episode = 0
loop_cnt = 0
while episode < train_num_episode:
remaining = train_num_episode - episode
batch_size = min(self.trainer_params['train_batch_size'], remaining)
#
dis_up, dis_down = self.env.load_problems(batch_size)
# (batch,node,node)->(batch,node,node,2)
batch_data = torch.stack([dis_up, dis_down], dim=-1)
single_batch_size = batch_size // 3
# Dataloader
sampler = torch.utils.data.DistributedSampler(batch_data)
batch_dataloader = torch.utils.data.DataLoader(batch_data,batch_size=single_batch_size,shuffle=False,sampler=sampler)
sampler.set_epoch(epoch)
for batch_idx,batch in enumerate(batch_dataloader):
batch_up = batch[:,:,:,0].to(self.device)
batch_down = batch[:,:,:,1].to(self.device)
# avg_score, avg_loss = self._train_one_batch(batch_size)
current_gpu = torch.cuda.current_device()
avg_score, avg_loss = self._train_one_batch(batch_up, batch_down, current_gpu)
score_AM.update(avg_score, batch_size)
loss_AM.update(avg_loss, batch_size)
dist.barrier()
episode += batch_size
# Log First 10 Batch, only at the first epoch
if epoch == self.start_epoch:
loop_cnt += 1
if loop_cnt <= 10:
self.logger.info('Epoch {:3d}: Train {:3d}/{:3d}({:1.1f}%) Score: {:.4f}, Loss: {:.4f}'
.format(epoch, episode, train_num_episode, 100. * episode / train_num_episode,
score_AM.avg, loss_AM.avg))
# Log Once, for each epoch
self.logger.info('Epoch {:3d}: Train ({:3.0f}%) Score: {:.4f}, Loss: {:.4f}'
.format(epoch, 100. * episode / train_num_episode,
score_AM.avg, loss_AM.avg))
return score_AM.avg, loss_AM.avg
def _train_one_batch(self, dis_up, dis_down, current_gpu):
# Prep
###############################################
self.model.train()
batch_size = dis_up.size(0)
#
reset_state_up,reset_state_down, _, _ = self.env.reset(dis_up,dis_down)
device = dis_up.device
prob_list = torch.zeros(size=(batch_size, self.env.pomo_size, 0)).to(device)
# shape: (batch, pomo, 0~)
# POMO Rollout
###############################################
state, reward, done = self.env.pre_step()
while not done:
selected, prob = self.model(state,reset_state_up,reset_state_down)
selected.to(self.device)
prob.to(self.device)
# shape: (batch, pomo)
# state, reward, done = self.env.step(selected)
state, reward, done = self.env.step(selected,current_gpu)
prob_list = torch.cat((prob_list, prob[:, :, None]), dim=2)
# Loss
###############################################
advantage = reward - reward.float().mean(dim=1, keepdims=True)
# shape: (batch, pomo)
log_prob = prob_list.log().sum(dim=2)
# size = (batch, pomo)
loss = -advantage * log_prob # Minus Sign: To Increase REWARD
# shape: (batch, pomo)
loss_mean = loss.mean()
# Score
###############################################
max_pomo_reward, _ = reward.max(dim=1) # get best results from pomo
score_mean = -max_pomo_reward.float().mean() # negative sign to make positive value
# Step & Return
###############################################
self.model.zero_grad()
loss_mean.backward()
self.optimizer.step()
return score_mean.item(), loss_mean.item()
main.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank',
default=-1,
type=int,
help='node rank for distributed training')
args = parser.parse_args()
print(args.local_rank)
def main():
if DEBUG_MODE:
_set_debug_mode()
create_logger(**logger_params)
_print_config()
trainer = Trainer(env_params=env_params,
model_params=model_params,
optimizer_params=optimizer_params,
trainer_params=trainer_params,
)
copy_all_src(trainer.result_folder)
trainer.run()