Reasons why after `loss.backward()` gradients in model.parameters() are None (distributed training with Dask)

Hi @ptrblck :slight_smile:

Yes, you are right. Dask is a library that allows doing distributed calculations in a very nice and easy way.

I will need to check that.

I have prepared some small codes to reproduce that. Suppose you have this linear regression:

import torch
import numpy as np

class NeuralNetwork(torch.nn.Module):

    def __init__(self, input_size, output_size):
        super(NeuralNetwork, self).__init__()
        self.linear1 = torch.nn.Linear(input_size, 10)
        self.linear2 = torch.nn.Linear(10, output_size)

    def forward(self, X):
        outputs = self.linear1(X)
        outputs = self.linear2(outputs)
        return outputs


# Data
x = np.random.rand(10, 1)
y = np.random.rand(10, 1)
x = torch.tensor(x, dtype=torch.float)
y = torch.tensor(y, dtype=torch.float)

# Model stuff
model = NeuralNetwork(1, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)

# Training

for epoch in range(10000):
    optimizer.zero_grad()
    outputs = model(x)
    criterion = torch.nn.MSELoss()
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()
    print(epoch, loss.item())

Using dask, I can do distributed training if I modify that code this way:

import numpy as np
import dask
from dask.distributed import Client, LocalCluster


def get_chunks(sequence, chunk_size, svm=True):
    """A function that yields a list in chunks"""
    res = []

    if svm is False and isinstance(sequence, dict):
        sequence = sequence.items()

    for item in sequence:
        res.append(item)

        if len(res) >= chunk_size:
            yield res
            res = []
    if res:
        yield res  # yield the last, incomplete, portion

def train_batches(index, chunk, targets, model):
    outputs = model(chunk)
    criterion = torch.nn.MSELoss()
    loss = criterion(outputs, targets[index])
    loss.backward()

    gradients = []
    for param in model.parameters():
        gradients.append(param.grad.detach().numpy())

    return outputs, loss, gradients

class NeuralNetwork(torch.nn.Module):

    def __init__(self, input_size, output_size):
        super(NeuralNetwork, self).__init__()
        self.linear1 = torch.nn.Linear(input_size, 10)
        self.linear2 = torch.nn.Linear(10, output_size)

    def forward(self, X):
        outputs = self.linear1(X)
        outputs = self.linear2(outputs)
        return outputs


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)

    # Data
    x = np.random.rand(10, 1)
    y = np.random.rand(10, 1)

    x = list(get_chunks(x, 5, svm=False))
    y = list(get_chunks(y, 5, svm=False))

    x = torch.tensor(x, dtype=torch.float)
    y = torch.tensor(y, dtype=torch.float)

    # Model stuff
    model = NeuralNetwork(1, 1)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)

    # Training

    for epoch in range(10000):
        optimizer.zero_grad()

        accumulation = []
        for index, chunk in enumerate(x):
            accumulation.append(client.submit(train_batches, *(index, chunk, y,
                                                               model)))
        dask.distributed.wait(accumulation)
        accumulation = client.gather(accumulation)

        losses = []
        outputs = []
        grads = []
        for index, chunk in enumerate(accumulation):
            outputs = chunk[0]
            loss = chunk[1]
            losses.append(loss)
            grad = np.array(chunk[2])
            grads.append(grad)

        grads = sum(grads)

        for index, param in enumerate(model.parameters()):
            param.grad = torch.tensor(grads[index])

        loss = sum(losses)

        optimizer.step()
        print(epoch, loss)

This works because one can see how the loss function is decreasing with each epoch.

If I go deeper and add a client.submit() inside forward():

    def forward(self, X):
        client = dask.distributed.get_client()
        outputs = self.linear1(X)
        outputs = client.submit(self.linear2, outputs)
        dask.distributed.wait(outputs)
        outputs = client.gather(outputs)
        return outputs

Then Pytorch does not create the gradients even though the outputs are computed correctly:

    result[0] = yield future
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.0+17.gd9143a0e-py3.7.egg/distributed/client.py", line 1497, in _gather
    traceback)
  File "/usr/lib/python3/dist-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "train.py", line 31, in train_batches
    gradients.append(param.grad.detach().numpy())
AttributeError: 'NoneType' object has no attribute 'detach'

For this toy example, doing this is useless. However for the dictionary in the forward() I showed in the first post this would be faster (I would expect). What do you think about this? Do you have any idea why is that failing?