TL;DR.
Maybe a lock is needed for optimizer.step()
while using multiprocessing.
The example provided in [MULTIPROCESSING BEST PRACTICES] (https://pytorch.org/docs/stable/notes/multiprocessing.html) is to update parameters without lock protection.
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# Construct data_loader, optimizer, etc.
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # This will update the shared parameters
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# NOTE: this is required for the ``fork`` method to work
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
And I checked some A3C code from github, and they do it in the similar way.
To double check whether it’s safe to do that, I created a test case:
- The global network is init with 0
- one worker will add it 1 for 1000 times
- another worker will less it 1 for 1000 times
- if it is safe, the final network’s weight shall be 0
- repeat 100 times to check whether it is safe
Test results:
- With lock protection, it’s safe
- Without lock protection (comment the lock, and unindent the
optimizer.step()
), from time to time, the weight can not come back to 0, which means: not safe
import torch
import torch.nn as nn
import torch.multiprocessing as mp
import torch.optim as optim
from torch.autograd import Variable
import os
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.fc1 = nn.Linear(1, 1)
self.fc1.weight.data.fill_(0.0)
self.fc1.bias.data.fill_(0.0)
def forward(self):
return 0
def worker1(net, grad, lock):
optimizer = optim.SGD(net.parameters(), lr=1)
for _ in range(1000):
optimizer.zero_grad()
for param in net.parameters():
param._grad = Variable(torch.ones_like(param))
with lock:
optimizer.step()
# print(f'in process {os.getpid()}, the grad of weight is: {net.fc1.weight.grad.item()}, weight is {net.fc1.weight.item()}')
def worker2(net, grad, lock):
optimizer = optim.SGD(net.parameters(), lr=1)
for _ in range(1000):
optimizer.zero_grad()
for param in net.parameters():
param._grad = -Variable(torch.ones_like(param))
with lock:
optimizer.step()
# print(f'in process {os.getpid()}, the grad of weight is: {net.fc1.weight.grad.item()}, weight is {net.fc1.weight.item()}')
if __name__ == '__main__':
n = Net()
n.share_memory()
lock = mp.Lock()
for idx in range(100):
# print(idx)
# print(f'in main {os.getpid()}, weight is {n.fc1.weight.item()}')
p1 = mp.Process(target = worker1, args=(n, 1.0, lock))
p2 = mp.Process(target = worker2, args=(n, 1.0, lock))
p1.start()
p2.start()
p1.join()
p2.join()
# print(f'in main {os.getpid()}, weight is {n.fc1.weight.item()}')
if n.fc1.weight.item() != 0:
print(f'{idx} in main {os.getpid()}, weight is {n.fc1.weight.item()}')
break
print('finished')
My pytorch version is 1.4.0+cpu
However, as I find it works without lock in the official “Hogwild” example and other examples, maybe the safe issue is not that important…