Reasons why after `loss.backward()` gradients in model.parameters() are None (distributed training with Dask)

I am working on making distributed training using Dask. So far, I have been able to use dask concurrent.futures interface to do training with success. What I basically do is:

  1. Create a list with batches of data points that I need for training the model.
  2. Call a train_batches function using concurrent.futures with a distributed scheduler.
  3. Accumulate the gradients returned by the futures, wait for them, and gather them.
  4. Set the gradients.
  5. And finally, call optimizer.step() .

I think I can go further with the distribution of calculations on my code because forward() is receiving a dictionary and I have to iterate over it:

    def forward(self, X):
        """Forward propagation
        This is forward propagation and it returns the atomic energy.
        Parameters
        ----------
        X : list
            List of inputs in the feature space.
        Returns
        -------
        outputs : tensor
            A list of tensors with energies per image.
        """

        outputs = []

        for hash in X:
            image = X[hash]
            atomic_energies = []

            for symbol, x in image:
                x = self.linears[symbol](x)

                intercept_name = 'intercept_' + symbol
                slope_name = 'slope_' + symbol
                slope = getattr(self, slope_name)
                intercept = getattr(self, intercept_name)

                x = (slope * x) + intercept
                atomic_energies.append(x)

            atomic_energies = torch.cat(atomic_energies)
            image_energy = torch.sum(atomic_energies)
            outputs.append(image_energy)
        outputs = torch.stack(outputs)
        return outputs

I am able to get futures that compute the outputs without problems. However, when calling loss, loss.backard() and doing this loop (that works when I don’t mess with forward()):

for param in model.parameters():
    gradients.append(param.grad.detach().numpy())

it raises an error because the model gradients are None. What would be the reason why the model is not getting the gradients? I would appreciate any discussion on this.

I’m not familiar with Dask and based on your description it seems you are using some kind of distributed setup.
If the gradients are None it’s usually a sign that Autograd hasn’t calculated these gradients yet. After the first iteration they should be valid or all zeros.
You’ve mentioned that your code is working if you don’t “mess” with forward?
Could you explain it a bit and what changes you made that seem to cause trouble?

Hi @ptrblck :slight_smile:

Yes, you are right. Dask is a library that allows doing distributed calculations in a very nice and easy way.

I will need to check that.

I have prepared some small codes to reproduce that. Suppose you have this linear regression:

import torch
import numpy as np

class NeuralNetwork(torch.nn.Module):

    def __init__(self, input_size, output_size):
        super(NeuralNetwork, self).__init__()
        self.linear1 = torch.nn.Linear(input_size, 10)
        self.linear2 = torch.nn.Linear(10, output_size)

    def forward(self, X):
        outputs = self.linear1(X)
        outputs = self.linear2(outputs)
        return outputs


# Data
x = np.random.rand(10, 1)
y = np.random.rand(10, 1)
x = torch.tensor(x, dtype=torch.float)
y = torch.tensor(y, dtype=torch.float)

# Model stuff
model = NeuralNetwork(1, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)

# Training

for epoch in range(10000):
    optimizer.zero_grad()
    outputs = model(x)
    criterion = torch.nn.MSELoss()
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()
    print(epoch, loss.item())

Using dask, I can do distributed training if I modify that code this way:

import numpy as np
import dask
from dask.distributed import Client, LocalCluster


def get_chunks(sequence, chunk_size, svm=True):
    """A function that yields a list in chunks"""
    res = []

    if svm is False and isinstance(sequence, dict):
        sequence = sequence.items()

    for item in sequence:
        res.append(item)

        if len(res) >= chunk_size:
            yield res
            res = []
    if res:
        yield res  # yield the last, incomplete, portion

def train_batches(index, chunk, targets, model):
    outputs = model(chunk)
    criterion = torch.nn.MSELoss()
    loss = criterion(outputs, targets[index])
    loss.backward()

    gradients = []
    for param in model.parameters():
        gradients.append(param.grad.detach().numpy())

    return outputs, loss, gradients

class NeuralNetwork(torch.nn.Module):

    def __init__(self, input_size, output_size):
        super(NeuralNetwork, self).__init__()
        self.linear1 = torch.nn.Linear(input_size, 10)
        self.linear2 = torch.nn.Linear(10, output_size)

    def forward(self, X):
        outputs = self.linear1(X)
        outputs = self.linear2(outputs)
        return outputs


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)

    # Data
    x = np.random.rand(10, 1)
    y = np.random.rand(10, 1)

    x = list(get_chunks(x, 5, svm=False))
    y = list(get_chunks(y, 5, svm=False))

    x = torch.tensor(x, dtype=torch.float)
    y = torch.tensor(y, dtype=torch.float)

    # Model stuff
    model = NeuralNetwork(1, 1)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)

    # Training

    for epoch in range(10000):
        optimizer.zero_grad()

        accumulation = []
        for index, chunk in enumerate(x):
            accumulation.append(client.submit(train_batches, *(index, chunk, y,
                                                               model)))
        dask.distributed.wait(accumulation)
        accumulation = client.gather(accumulation)

        losses = []
        outputs = []
        grads = []
        for index, chunk in enumerate(accumulation):
            outputs = chunk[0]
            loss = chunk[1]
            losses.append(loss)
            grad = np.array(chunk[2])
            grads.append(grad)

        grads = sum(grads)

        for index, param in enumerate(model.parameters()):
            param.grad = torch.tensor(grads[index])

        loss = sum(losses)

        optimizer.step()
        print(epoch, loss)

This works because one can see how the loss function is decreasing with each epoch.

If I go deeper and add a client.submit() inside forward():

    def forward(self, X):
        client = dask.distributed.get_client()
        outputs = self.linear1(X)
        outputs = client.submit(self.linear2, outputs)
        dask.distributed.wait(outputs)
        outputs = client.gather(outputs)
        return outputs

Then Pytorch does not create the gradients even though the outputs are computed correctly:

    result[0] = yield future
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.0+17.gd9143a0e-py3.7.egg/distributed/client.py", line 1497, in _gather
    traceback)
  File "/usr/lib/python3/dist-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "train.py", line 31, in train_batches
    gradients.append(param.grad.detach().numpy())
AttributeError: 'NoneType' object has no attribute 'detach'

For this toy example, doing this is useless. However for the dictionary in the forward() I showed in the first post this would be faster (I would expect). What do you think about this? Do you have any idea why is that failing?