[ERROR] Implementation of pytorch DistributedParallel

I am based on the POMO code POMO to change it to a single machine multi-GPU graphics card running mode

Let me explain the specific code logic:
First, each epoch has train_num_episode = self_TRAINer_params ['train_episodes'] data to run. Due to insufficient GPU memory to run at one time, Therefore, it runs in multiple batches (while part) batch_size = min(self_trainer_params ['train_batch_size'], remaining). However, even if it is run in multiple batches, the required video memory is still large and the running time is slow. So instead, it runs on multiple Gpus. Load the batch_size data, create the sampler and dataloader to divide the data, and then set the epoch. Finally, execute them one by one.

def _train_one_epoch(self,epoch):

    score_AM = AverageMeter()
    loss_AM = AverageMeter()

    train_num_episode = self.trainer_params['train_episodes']
    episode = 0
    loop_cnt = 0
    while episode < train_num_episode:

        remaining = train_num_episode - episode
        batch_size = min(self.trainer_params['train_batch_size'], remaining)
        # load every epoch to DataLoader
        dis_up, dis_down = self.env.load_problems(batch_size)
        # (batch,node,node)->(batch,node,node,2) 
        batch_data = torch.stack([dis_up, dis_down], dim=-1)
        single_batch_size = batch_size // 3

        # create Dataloader
        sampler = torch.utils.data.DistributedSampler(batch_data)
        batch_dataloader = torch.utils.data.DataLoader(batch_data,batch_size = single_batch_size,shuffle=False,sampler=sampler)

        sampler.set_epoch(epoch)
        for batch_idx,batch in enumerate(batch_dataloader):
            batch_up = batch[:,:,:,0].to(self.device)
            batch_down = batch[:,:,:,1].to(self.device)
            # avg_score, avg_loss = self._train_one_batch(batch_size)
            current_gpu = torch.cuda.current_device()
            avg_score, avg_loss = self._train_one_batch(batch_up, batch_down, current_gpu)
            score_AM.update(avg_score, batch_size)
            loss_AM.update(avg_loss, batch_size)

        dist.barrier()
        episode += batch_size

        # Log First 10 Batch, only at the first epoch
        if epoch == self.start_epoch:
            loop_cnt += 1
            if loop_cnt <= 10:
                self.logger.info('Epoch {:3d}: Train {:3d}/{:3d}({:1.1f}%)  Score: {:.4f},  Loss: {:.4f}'
                                 .format(epoch, episode, train_num_episode, 100. * episode / train_num_episode,
                                         score_AM.avg, loss_AM.avg))

    # Log Once, for each epoch
    self.logger.info('Epoch {:3d}: Train ({:3.0f}%)  Score: {:.4f},  Loss: {:.4f}'
                     .format(epoch, 100. * episode / train_num_episode,
                             score_AM.avg, loss_AM.avg))

    return score_AM.avg, loss_AM.avg

it occurs an error,But I don’t know how to solve it, can someone help me?

 for batch_idx,batch in enumerate(batch_dataloader):
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 681, in __next__
    for batch_idx,batch in enumerate(batch_dataloader):
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 681, in __next__
        data = self._next_data()data = self._next_data()

  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
    data = self._next_data()
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 720, in _next_data
    index = self._next_index()  # may raise StopIteration
      File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
index = self._next_index()  # may raise StopIteration
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
    index = self._next_index()  # may raise StopIteration
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/dataloader.py", line 671, in _next_index
    return next(self._sampler_iter)  # may raise StopIteration
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
    return next(self._sampler_iter)  # may raise StopIteration
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
    return next(self._sampler_iter)  # may raise StopIteration
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/sampler.py", line 247, in __iter__
    for idx in self.sampler:
      File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
for idx in self.sampler:
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
    for idx in self.sampler:
  File "/home/.conda/envs/lib/python3.8/site-packages/torch/utils/data/distributed.py", line 101, in __iter__
    indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
    indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
RuntimeErrorRuntimeError: : Expected a 'cuda' device type for generator but found 'cpu'Expected a 'cuda' device type for generator but found 'cpu'

    indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 3773341) of binary: /home/.conda/envs/bin/python

the complete code implementation is as follows:

Trainer.py

import torch
from logging import getLogger
import torch.distributed as dist
from torch.utils.data import DataLoader, TensorDataset

from TSPEnv import TSPEnv as Env
from TSPModel import TSPModel as Model

from torch.optim import Adam as Optimizer
from torch.optim.lr_scheduler import MultiStepLR as Scheduler

from utils.utils import *

class TSPTrainer:
    def __init__(self,
                 env_params,
                 model_params,
                 optimizer_params,
                 trainer_params):

        # save arguments
        self.env_params = env_params
        self.model_params = model_params
        self.optimizer_params = optimizer_params
        self.trainer_params = trainer_params

        # saved_models folder, logger
        self.logger = getLogger(name='trainer')
        self.result_folder = get_result_folder()
        self.result_log = LogData()

        # 
        torch.distributed.init_process_group(backend="nccl")
        # device
        local_rank = torch.distributed.get_rank()
        torch.cuda.set_device(local_rank)

        device = torch.device("cuda",local_rank)
        self.device = device
        # cuda

        # USE_CUDA = self.trainer_params['use_cuda']
        # if USE_CUDA:
        #     cuda_device_num = self.trainer_params['cuda_device_num']
        #     torch.cuda.set_device('cuda:{}'.format(cuda_device_num[0]))
        #     device = torch.device('cuda', cuda_device_num[0])
        #     torch.set_default_tensor_type('torch.cuda.FloatTensor')
        # else:
        #     device = torch.device('cpu')
        #     torch.set_default_tensor_type('torch.FloatTensor')

        # Main Components
        self.model = Model(**self.model_params)
        self.model.to(device)

        # self.model.pre_forward() = self.model.pre_forward().to(device)
        self.model = torch.nn.parallel.DistributedDataParallel(self.model,device_ids=[local_rank],output_device=local_rank,find_unused_parameters=True)


        self.env = Env(**self.env_params)
        self.optimizer = Optimizer(self.model.parameters(), **self.optimizer_params['optimizer'])
        self.scheduler = Scheduler(self.optimizer, **self.optimizer_params['scheduler'])

        # Restore
        self.start_epoch = 1
        model_load = trainer_params['model_load']
        if model_load['enable']:
            checkpoint_fullname = '{path}/checkpoint-{epoch}.pt'.format(**model_load)
            checkpoint = torch.load(checkpoint_fullname, map_location=device)
            self.model.load_state_dict(checkpoint['model_state_dict'])
            self.start_epoch = 1 + model_load['epoch']
            self.result_log.set_raw_data(checkpoint['result_log'])
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            self.scheduler.last_epoch = model_load['epoch']-1
            self.logger.info('Saved Model Loaded !!')

        # utility
        self.time_estimator = TimeEstimator()

    def run(self):
        self.time_estimator.reset(self.start_epoch)
        for epoch in range(self.start_epoch, self.trainer_params['epochs']+1):
            self.logger.info('=================================================================')

            # LR Decay
            self.scheduler.step()

            # Train
            train_score, train_loss = self._train_one_epoch(epoch)
            self.result_log.append('train_score', epoch, train_score)
            self.result_log.append('train_loss', epoch, train_loss)

            ############################
            # Logs & Checkpoint
            ############################
            elapsed_time_str, remain_time_str = self.time_estimator.get_est_string(epoch, self.trainer_params['epochs'])
            self.logger.info("Epoch {:3d}/{:3d}: Time Est.: Elapsed[{}], Remain[{}]".format(
                epoch, self.trainer_params['epochs'], elapsed_time_str, remain_time_str))

            all_done = (epoch == self.trainer_params['epochs'])
            model_save_interval = self.trainer_params['logging']['model_save_interval']
            img_save_interval = self.trainer_params['logging']['img_save_interval']

            if epoch > 1:  # save latest images, every epoch
                self.logger.info("Saving log_image")
                image_prefix = '{}/latest'.format(self.result_folder)
                util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_1'],
                                    self.result_log, labels=['train_score'])
                util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_2'],
                                    self.result_log, labels=['train_loss'])

            if all_done or (epoch % model_save_interval) == 0:
                self.logger.info("Saving trained_model")
                checkpoint_dict = {
                    'epoch': epoch,
                    'model_state_dict': self.model.state_dict(),
                    'optimizer_state_dict': self.optimizer.state_dict(),
                    'scheduler_state_dict': self.scheduler.state_dict(),
                    'result_log': self.result_log.get_raw_data()
                }
                torch.save(checkpoint_dict, '{}/checkpoint-{}.pt'.format(self.result_folder, epoch))

            if all_done or (epoch % img_save_interval) == 0:
                image_prefix = '{}/img/checkpoint-{}'.format(self.result_folder, epoch)
                util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_1'],
                                    self.result_log, labels=['train_score'])
                util_save_log_image_with_label(image_prefix, self.trainer_params['logging']['log_image_params_2'],
                                    self.result_log, labels=['train_loss'])

            if all_done:
                self.logger.info(" *** Training Done *** ")
                self.logger.info("Now, printing log array...")
                util_print_log_array(self.logger, self.result_log)

    def _train_one_epoch(self,epoch):

        score_AM = AverageMeter()
        loss_AM = AverageMeter()

        train_num_episode = self.trainer_params['train_episodes']
        episode = 0
        loop_cnt = 0
        while episode < train_num_episode:

            remaining = train_num_episode - episode
            batch_size = min(self.trainer_params['train_batch_size'], remaining)
            #
            dis_up, dis_down = self.env.load_problems(batch_size)
            # (batch,node,node)->(batch,node,node,2)
            batch_data = torch.stack([dis_up, dis_down], dim=-1)
            single_batch_size = batch_size // 3

            # Dataloader
            sampler = torch.utils.data.DistributedSampler(batch_data)
            batch_dataloader = torch.utils.data.DataLoader(batch_data,batch_size=single_batch_size,shuffle=False,sampler=sampler)

            sampler.set_epoch(epoch)
            for batch_idx,batch in enumerate(batch_dataloader):
                batch_up = batch[:,:,:,0].to(self.device)
                batch_down = batch[:,:,:,1].to(self.device)
                # avg_score, avg_loss = self._train_one_batch(batch_size)
                current_gpu = torch.cuda.current_device()
                avg_score, avg_loss = self._train_one_batch(batch_up, batch_down, current_gpu)
                score_AM.update(avg_score, batch_size)
                loss_AM.update(avg_loss, batch_size)

            dist.barrier()
            episode += batch_size

            # Log First 10 Batch, only at the first epoch
            if epoch == self.start_epoch:
                loop_cnt += 1
                if loop_cnt <= 10:
                    self.logger.info('Epoch {:3d}: Train {:3d}/{:3d}({:1.1f}%)  Score: {:.4f},  Loss: {:.4f}'
                                     .format(epoch, episode, train_num_episode, 100. * episode / train_num_episode,
                                             score_AM.avg, loss_AM.avg))

        # Log Once, for each epoch
        self.logger.info('Epoch {:3d}: Train ({:3.0f}%)  Score: {:.4f},  Loss: {:.4f}'
                         .format(epoch, 100. * episode / train_num_episode,
                                 score_AM.avg, loss_AM.avg))

        return score_AM.avg, loss_AM.avg

    def _train_one_batch(self, dis_up, dis_down, current_gpu):

        # Prep
        ###############################################
        self.model.train()
        batch_size = dis_up.size(0)
        #
        reset_state_up,reset_state_down, _, _ = self.env.reset(dis_up,dis_down)

        device = dis_up.device
        prob_list = torch.zeros(size=(batch_size, self.env.pomo_size, 0)).to(device)
        # shape: (batch, pomo, 0~)

        # POMO Rollout
        ###############################################
        state, reward, done = self.env.pre_step()
        while not done:
            selected, prob = self.model(state,reset_state_up,reset_state_down)
            selected.to(self.device)
            prob.to(self.device)

            # shape: (batch, pomo)
            # state, reward, done = self.env.step(selected)
            state, reward, done = self.env.step(selected,current_gpu)
            prob_list = torch.cat((prob_list, prob[:, :, None]), dim=2)

        # Loss
        ###############################################
        advantage = reward - reward.float().mean(dim=1, keepdims=True)
        # shape: (batch, pomo)
        log_prob = prob_list.log().sum(dim=2)
        # size = (batch, pomo)
        loss = -advantage * log_prob  # Minus Sign: To Increase REWARD
        # shape: (batch, pomo)
        loss_mean = loss.mean()

        # Score
        ###############################################
        max_pomo_reward, _ = reward.max(dim=1)  # get best results from pomo
        score_mean = -max_pomo_reward.float().mean()  # negative sign to make positive value

        # Step & Return
        ###############################################
        self.model.zero_grad()
        loss_mean.backward()
        self.optimizer.step()
        return score_mean.item(), loss_mean.item()
main.py
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank',
                    default=-1,
                    type=int,
                    help='node rank for distributed training')
args = parser.parse_args()

print(args.local_rank)

def main():
    if DEBUG_MODE:
        _set_debug_mode()

    create_logger(**logger_params)
    _print_config()

    trainer = Trainer(env_params=env_params,
                      model_params=model_params,
                      optimizer_params=optimizer_params,
                      trainer_params=trainer_params,
                      )

    copy_all_src(trainer.result_folder)

    trainer.run()

I guess you are setting the default device to cuda somewhere, which will raise this error:

dataset = TensorDataset(torch.randn(10, 1))

# works
g = torch.Generator()
g.manual_seed(0)
indices = torch.randperm(len(dataset), generator=g).tolist()

# breaks
torch.set_default_device("cuda")
indices = torch.randperm(len(dataset), generator=g).tolist()
# RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'

I’m sorry, you mean I can’t use sentences torch.set_default_device("cuda")

I did use that statement in the previous code

       # init
        torch.distributed.init_process_group(backend="nccl")
        # set the device
        local_rank = torch.distributed.get_rank()
        torch.cuda.set_device(local_rank)

        device = torch.device("cuda",local_rank)
        self.device = device

How can I modify it?

I tried to remove this statement from my code, but sadly it still returned the same error.

The code works for me without this statement and only fails with it, so I guess you might be setting it somewhere else.