Hi @ptrblck 
Yes, you are right. Dask is a library that allows doing distributed calculations in a very nice and easy way.
I will need to check that.
I have prepared some small codes to reproduce that. Suppose you have this linear regression:
import torch
import numpy as np
class NeuralNetwork(torch.nn.Module):
def __init__(self, input_size, output_size):
super(NeuralNetwork, self).__init__()
self.linear1 = torch.nn.Linear(input_size, 10)
self.linear2 = torch.nn.Linear(10, output_size)
def forward(self, X):
outputs = self.linear1(X)
outputs = self.linear2(outputs)
return outputs
# Data
x = np.random.rand(10, 1)
y = np.random.rand(10, 1)
x = torch.tensor(x, dtype=torch.float)
y = torch.tensor(y, dtype=torch.float)
# Model stuff
model = NeuralNetwork(1, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)
# Training
for epoch in range(10000):
optimizer.zero_grad()
outputs = model(x)
criterion = torch.nn.MSELoss()
loss = criterion(outputs, y)
loss.backward()
optimizer.step()
print(epoch, loss.item())
Using dask, I can do distributed training if I modify that code this way:
import numpy as np
import dask
from dask.distributed import Client, LocalCluster
def get_chunks(sequence, chunk_size, svm=True):
"""A function that yields a list in chunks"""
res = []
if svm is False and isinstance(sequence, dict):
sequence = sequence.items()
for item in sequence:
res.append(item)
if len(res) >= chunk_size:
yield res
res = []
if res:
yield res # yield the last, incomplete, portion
def train_batches(index, chunk, targets, model):
outputs = model(chunk)
criterion = torch.nn.MSELoss()
loss = criterion(outputs, targets[index])
loss.backward()
gradients = []
for param in model.parameters():
gradients.append(param.grad.detach().numpy())
return outputs, loss, gradients
class NeuralNetwork(torch.nn.Module):
def __init__(self, input_size, output_size):
super(NeuralNetwork, self).__init__()
self.linear1 = torch.nn.Linear(input_size, 10)
self.linear2 = torch.nn.Linear(10, output_size)
def forward(self, X):
outputs = self.linear1(X)
outputs = self.linear2(outputs)
return outputs
if __name__ == '__main__':
cluster = LocalCluster(n_workers=4, threads_per_worker=2)
client = Client(cluster)
# Data
x = np.random.rand(10, 1)
y = np.random.rand(10, 1)
x = list(get_chunks(x, 5, svm=False))
y = list(get_chunks(y, 5, svm=False))
x = torch.tensor(x, dtype=torch.float)
y = torch.tensor(y, dtype=torch.float)
# Model stuff
model = NeuralNetwork(1, 1)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-6)
# Training
for epoch in range(10000):
optimizer.zero_grad()
accumulation = []
for index, chunk in enumerate(x):
accumulation.append(client.submit(train_batches, *(index, chunk, y,
model)))
dask.distributed.wait(accumulation)
accumulation = client.gather(accumulation)
losses = []
outputs = []
grads = []
for index, chunk in enumerate(accumulation):
outputs = chunk[0]
loss = chunk[1]
losses.append(loss)
grad = np.array(chunk[2])
grads.append(grad)
grads = sum(grads)
for index, param in enumerate(model.parameters()):
param.grad = torch.tensor(grads[index])
loss = sum(losses)
optimizer.step()
print(epoch, loss)
This works because one can see how the loss function is decreasing with each epoch.
If I go deeper and add a client.submit()
inside forward()
:
def forward(self, X):
client = dask.distributed.get_client()
outputs = self.linear1(X)
outputs = client.submit(self.linear2, outputs)
dask.distributed.wait(outputs)
outputs = client.gather(outputs)
return outputs
Then Pytorch does not create the gradients even though the outputs are computed correctly:
result[0] = yield future
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
value = future.result()
File "/home/muammar/.local/lib/python3.7/site-packages/tornado/gen.py", line 736, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/home/muammar/.local/lib/python3.7/site-packages/distributed-1.26.0+17.gd9143a0e-py3.7.egg/distributed/client.py", line 1497, in _gather
traceback)
File "/usr/lib/python3/dist-packages/six.py", line 692, in reraise
raise value.with_traceback(tb)
File "train.py", line 31, in train_batches
gradients.append(param.grad.detach().numpy())
AttributeError: 'NoneType' object has no attribute 'detach'
For this toy example, doing this is useless. However for the dictionary in the forward()
I showed in the first post this would be faster (I would expect). What do you think about this? Do you have any idea why is that failing?