Hi, I am running into this error:
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by (1) passing the keyword argument
find_unused_parameters=True
totorch.nn.parallel.DistributedDataParallel
; (2) making sure allforward
function outputs participate in calculating loss. If you already have done the above two steps, then the distributed data parallel module wasn’t able to locate the output tensors in the return value of your module’sforward
function. Please include the loss function and the structure of the return value offorward
of your module when reporting this issue (e.g. list, dict, iterable).
Given the following code:
class CSVDataset(Dataset):
def __init__(self, path, filesize, chunksize):
self.path = path
self.chunksize = chunksize
self.filesize = filesize
nb_samples = int(subprocess.check_output(f'wc -l {path}', shell=True, text=True).split()[0])
self.len = nb_samples / self.chunksize
def __getitem__(self, index):
x = pd.read_csv(
self.path,
skiprows=index * self.filesize +1,
chunksize=self.chunksize,
names=['src','dst','time', 'edgetype','UUID', 'roots'], nrows=self.filesize)
return x
def __len__(self):
return self.len
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
class Net(nn.Module):
def __init__(self, in_channels, hidden_channels, out_channels, num_layers):
super(Net, self).__init__()
self.num_layers = num_layers
self.in_channels = in_channels
self.lin1 = torch.nn.Linear(in_channels, hidden_channels)
self.lin2 = torch.nn.Linear(in_channels, hidden_channels)
self.lin3 = torch.nn.Linear(in_channels, hidden_channels)
self.pre = torch.nn.GRU(hidden_channels, hidden_channels, num_layers, dropout = 0.3)
self.lin4 = torch.nn.Linear(2*hidden_channels, hidden_channels)
self.lin5 = torch.nn.Linear(hidden_channels, out_channels)
def forward(self, x, h, softMax=False, roots=None):
if x.shape[0] == self.in_channels:
x = F.relu(self.lin1(x))
if h.shape[0] == self.in_channels:
h = F.relu(self.lin2(h))
h = h.unsqueeze(0)
h = h.repeat(self.num_layers, 1).unsqueeze(1)
x = x.unsqueeze(0).unsqueeze(0)
temp1, h = self.pre(x, h)
h = h[-1]
h2 = F.relu(self.lin3(roots)).unsqueeze(0)
h3 = self.lin4(torch.cat([h, h2], 1).squeeze())
out = None
if softMax:
out = self.lin5(h3)
return h3, out
def embed(txt):
return embedModels(txt)
def process(csvStream, model, optimizer, scheduler, lossFunction, embedModel, device = None):
global batch_size, grad_clip
if train:
model.train()
else:
model.eval()
gLoss = None
meanCount = 0
dic = {}
for i in range(len(edge_stream)):
uuid1, sent1, uuid2, sent2, label = csvStream[i]
if uuid1 not in dic:
sent1 = embed(sent1)
if uuid2 not in dic:
sent2 = embed(sent2)
newSent2, out = model(sent1, sent2 softMax=True)
if train:
loss = lossFunction(out.unsqueeze(0).to(device), torch.LongTensor([label]).to(device))
meanCount += 1
if gLoss is None:
gLoss = loss
else:
gLoss += loss
dic[uuid2] = newSent2
if meanCount == batch_size:
gLoss /= batch_size
gLoss.backward()
if grad_clip >= 0.0:
torch.nn.utils.clip_grad_norm(model.parameters(), grad_clip)
optimizer.step()
scheduler.step(gLoss)
gLoss = None
meanCount = 0
if gLoss is not None:
gLoss /= meanCount
gLoss.backward()
if grad_clip >= 0.0:
torch.nn.utils.clip_grad_norm(model.parameters(), grad_clip)
optimizer.step()
scheduler.step(gLoss)
def distribute(rank, world_size):
setup(rank, world_size)
# create model and move it to GPU with id rank
model = Net(num_features, 256, num_classes, 10).to(rank)
ddp_model = DDP(model, device_ids=[rank], find_unused_parameters=True)
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
print(CHECKPOINT_PATH)
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location))
eM = embedModels
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=lr, verbose=True)
lossFunction = nn.CrossEntropyLoss()
optimizer.zero_grad()
df = csvDataset.__getitem__(rank)
for epoch in range(epochs):
for chunk in df:
gatherLabels, gartherLoss = process(chunk, ddp_model, optimizer, scheduler, lossFunction, eM, train = True, flag = True, device = rank)
wandb.log({"accuracy:{rank}": sum(gatherLabels)/len(gatherLabels)})
wandb.log({"loss:{rank}": sum(gartherLoss)/len(gartherLoss)})
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
pathToFile = 'dataset.csv'
num_features = 64
embedModels = doc2vec
labels = list(range(10))
num_classes = len(labels)
batch_size = 500
chunkSize = 50000
fileSize = 500000
csvDataset = CSVDataset(pathToFile, filesize=fileSize, chunksize=chunkSize)
If I include find_unused_parameters=True as part of the DDP construction, I get the following error:
RuntimeError: Expected to mark a variable ready only once. This error is caused by the following reasons: 1) Use of a module parameter outside the
forward
function. Please make sure model parameters are not shared across multiple concurrent forward-backward passes2) Reused parameters in multiple reentrant backward passes. For example, if you use multiplecheckpoint
functions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases yet.
Looking this error up, it seems that setting find_unused_parameters=True causes it to occur. If I run this code non-distributed, it works fine. I have tried to run the following code between my loss.backward() and optimizer.step()
for name, param in model.named_parameters():
if param.grad is None:
print(name)
but this does not print anything. I am not sure what I am missing.