RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one

I have a class which inherits from MyModel (parent class) which is a relatively large model :

class DynamicModel(MyModel):
    def __init__(self, num_inputs, num_outputs, h_dim=96, z_dim=48, n_layers=2, n_mixtures=10, device= torch.device('cuda' if torch.cuda.is_available() else 'cpu'), normalizer_input=None, normalizer_output=None,
                 *args, **kwargs):
        super(DynamicModel, self).__init__(u_dim=num_inputs,y_dim= num_outputs, h_dim=h_dim, z_dim=z_dim, n_layers=n_layers, n_mixtures=n_mixtures, device=device)
        # Save parameters
        self.num_inputs = num_inputs
        self.num_outputs = num_outputs
        self.args = args
        self.kwargs = kwargs
        self.normalizer_input = normalizer_input
        self.normalizer_output = normalizer_output
        
        self.to(device)

    @property
    def num_model_inputs(self):
        return self.num_inputs + self.num_outputs if self.ar else self.num_inputs

    def forward(self, u, y=None):
        if self.normalizer_input is not None:
            u = self.normalizer_input.normalize(u)
        if y is not None and self.normalizer_output is not None:
            y = self.normalizer_output.normalize(y)

        loss =super(DynamicModel, self).forward(u, y)
        return loss

    def generate(self, u, y=None):
        if self.normalizer_input is not None:
            u = self.normalizer_input.normalize(u)

        y_sample, y_sample_mu, y_sample_sigma = super(DynamicModel, self).generate(u)

        if self.normalizer_output is not None:
            y_sample = self.normalizer_output.unnormalize(y_sample)
        if self.normalizer_output is not None:
            y_sample_mu = self.normalizer_output.unnormalize_mean(y_sample_mu)
        if self.normalizer_output is not None:
            y_sample_sigma = self.normalizer_output.unnormalize_sigma(y_sample_sigma)

        return y_sample, y_sample_mu, y_sample_sigma

class ModelState:
    def __init__(self,
                 seed,
                 nu,
                 ny,
                 h_dim=56,
                 z_dim=48,
                 n_layers=2,
                 n_mixtures=8,
                 device= torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
                 optimizer_type= "AdamW",
                 **kwargs):

        torch.manual_seed(seed)

        self.h_dim=h_dim
        self.z_dim=z_dim
        self.n_layers=n_layers
        self.n_mixtures= n_mixtures

        self.model = DynamicModel( num_inputs=nu, num_outputs=ny, h_dim=h_dim, z_dim=z_dim, n_layers=n_layers, n_mixtures=n_mixtures, device=device, **kwargs)
        if optimizer_type == "AdaBelief":
            self.optimizer = torch_optimizer.AdaBelief(self.model.parameters(),
                                                       lr= 1e-4,
                                                      betas=(0.9, 0.999),
                                                      eps=1e-6,
                                                      weight_decay=0
                                                      )
        elif optimizer_type=="AdamW":
            self.optimizer = torch.optim.AdamW(self.model.parameters(),
                                               lr= 1e-4,
                                               betas=(0.9, 0.999)
                                              )
    
        else:
            # Optimization parameters
            yogi = torch_optimizer.Yogi(self.model.parameters(), lr= 0.5e-4, betas=(0.95, 0.999), eps=1e-3, initial_accumulator=1e-6, weight_decay=0,)

            self.optimizer = torch_optimizer.Lookahead(yogi, k=5, alpha=0.5)


    def load_model(self, path, name='my_model.pt', map_location=None):
        file = path if os.path.isfile(path) else os.path.join(path, name)
        try:
            if map_location is None:
               ckpt = torch.load(file, map_location=lambda storage, loc: storage)
            else:
               ckpt = torch.load(file, map_location=map_location)
        except NotADirectoryError:
            raise Exception("Could not find model: " + file)
        self.model.load_state_dict(ckpt["model"])
        self.optimizer.load_state_dict(ckpt["optimizer"])
        epoch = ckpt['epoch']
        return epoch

    def save_model(self, epoch, vloss, elapsed_time,  path, name='my_model.pt'):
        if not os.path.exists(path):
            os.makedirs(path)
        torch.save({
                'epoch': epoch,
                'model': self.model.state_dict(),
                'optimizer': self.optimizer.state_dict(),
                'vloss': vloss,
                'elapsed_time': elapsed_time,
            },
            os.path.join(path, name))

Next I tried to use DistributedDataParallel to train my model on multiple GPUs.


    batch_size = args.batch_size
    train_dataset = TensorDataset(train_x, train_y)
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                    num_replicas=args.world_size,
                                                                    rank=rank,
                                                                    shuffle=True,
                                                                    )

    train_loader = DataLoader(train_dataset, 
                              batch_size=batch_size , 
                              shuffle=False, 
                              num_workers=args.num_workers, 
                              sampler=train_sampler,
                              worker_init_fn=seed_worker,
                              generator=g,)

    normalizer_input, normalizer_output = compute_normalizer(train_loader)

    # Define model
    modelstate = ModelState(seed=seed,
                            nu=u_dim,
                            ny=y_dim,
                            #normalizer_input=normalizer_input,
                            #normalizer_output=normalizer_output
                            )
    modelstate.model.cuda()
    modelstate.model = torch.nn.parallel.DistributedDataParallel(modelstate.model, device_ids=[current_device])
    print('passed distributed data parallel call')

When I trained the model with the distributed data on a cluster before finishing the first epoch, this error message interrupted the training process

    loss_ = modelstate.model(u, y)
  File "/home/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
    return forward_call(*input, **kwargs)
  File "/home/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 1026, in forward
    if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by 
making sure all `forward` function outputs participate in calculating loss. 
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return value of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameter indices which did not receive grad for rank 9: 12 13 26 27 79 80 81 82 83 84 85 86 87 88
 In addition, you can set the environment variable TORCH_DISTRIBUTED_DEBUG to either INFO or DETAIL to print out information about which particular parameters did not receive gradient on this rank as part of this error
Traceback (most recent call last):

I would truely appreciate if someone could provide a solution.

Does “passing the keyword argument find_unused_parameters=True to torch.nn.parallel.DistributedDataParallel” help by any chance?

Hi, I ran into this bug recently. I have tried the solution recommended by @XWu , and it is starting to train. However, there are other tricks too like multiplying the unused_params by 0 and others, check this Github issues thread for more: