Error with DDP, RNN, and CSV

Hi, I am running into this error:

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by (1) passing the keyword argument find_unused_parameters=True to torch.nn.parallel.DistributedDataParallel; (2) making sure all forward function outputs participate in calculating loss. If you already have done the above two steps, then the distributed data parallel module wasn’t able to locate the output tensors in the return value of your module’s forward function. Please include the loss function and the structure of the return value of forward of your module when reporting this issue (e.g. list, dict, iterable).

Given the following code:

class CSVDataset(Dataset):
    def __init__(self, path, filesize, chunksize):
        self.path = path
        self.chunksize = chunksize
        self.filesize = filesize
        nb_samples = int(subprocess.check_output(f'wc -l {path}', shell=True, text=True).split()[0])
        self.len = nb_samples / self.chunksize

    def __getitem__(self, index):
        x = pd.read_csv(
                self.path,
                skiprows=index * self.filesize +1,
                chunksize=self.chunksize,
                names=['src','dst','time', 'edgetype','UUID', 'roots'], nrows=self.filesize)
        return x

    def __len__(self):
        return self.len


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()


class Net(nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels, num_layers):
        super(Net, self).__init__()
        self.num_layers = num_layers
        self.in_channels = in_channels
        self.lin1 = torch.nn.Linear(in_channels, hidden_channels)
        self.lin2 = torch.nn.Linear(in_channels, hidden_channels)
        self.lin3 = torch.nn.Linear(in_channels, hidden_channels)
        self.pre = torch.nn.GRU(hidden_channels, hidden_channels, num_layers, dropout = 0.3)
        self.lin4 = torch.nn.Linear(2*hidden_channels, hidden_channels)
        self.lin5 =  torch.nn.Linear(hidden_channels, out_channels)

    def forward(self, x, h, softMax=False, roots=None):
        if x.shape[0] == self.in_channels:
            x = F.relu(self.lin1(x))
        if h.shape[0] == self.in_channels:
            h = F.relu(self.lin2(h))
        h = h.unsqueeze(0)
        h = h.repeat(self.num_layers, 1).unsqueeze(1)
        x = x.unsqueeze(0).unsqueeze(0)
        temp1, h = self.pre(x, h)
        h = h[-1]
        h2 = F.relu(self.lin3(roots)).unsqueeze(0)
        h3 = self.lin4(torch.cat([h, h2], 1).squeeze())
        out = None
        if softMax:
            out = self.lin5(h3)
        return h3, out

def embed(txt):
    return embedModels(txt)

def process(csvStream, model, optimizer, scheduler, lossFunction, embedModel, device = None):
    global batch_size, grad_clip
    if train:
        model.train()
    else:
        model.eval()
    gLoss = None
    meanCount = 0
    dic = {}
    for i in range(len(edge_stream)):
        uuid1, sent1, uuid2, sent2, label = csvStream[i]
        if uuid1 not in dic:
            sent1 = embed(sent1)
        if uuid2 not in dic:
            sent2 = embed(sent2)
        newSent2, out = model(sent1, sent2 softMax=True)
        if train:
            loss = lossFunction(out.unsqueeze(0).to(device), torch.LongTensor([label]).to(device))
            meanCount += 1
            if gLoss is None:
                gLoss = loss
            else:
                gLoss += loss
        dic[uuid2] = newSent2
        if meanCount == batch_size:
            gLoss /= batch_size
            gLoss.backward()
            if grad_clip >= 0.0:
                torch.nn.utils.clip_grad_norm(model.parameters(), grad_clip)
            optimizer.step()
            scheduler.step(gLoss)
            gLoss = None
            meanCount = 0
    if gLoss is not None:
        gLoss /= meanCount
        gLoss.backward()
        if grad_clip >= 0.0:
            torch.nn.utils.clip_grad_norm(model.parameters(), grad_clip)
        optimizer.step()
        scheduler.step(gLoss)

def distribute(rank, world_size):
    setup(rank, world_size)
    # create model and move it to GPU with id rank
    model = Net(num_features, 256, num_classes, 10).to(rank)
    ddp_model = DDP(model, device_ids=[rank], find_unused_parameters=True)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        print(CHECKPOINT_PATH)
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))
    eM = embedModels
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=lr, verbose=True)
    lossFunction = nn.CrossEntropyLoss()
    optimizer.zero_grad()
    df = csvDataset.__getitem__(rank)
    for epoch in range(epochs):
        for chunk in df:
            gatherLabels, gartherLoss = process(chunk, ddp_model, optimizer, scheduler, lossFunction, eM, train = True, flag = True, device = rank)
            wandb.log({"accuracy:{rank}": sum(gatherLabels)/len(gatherLabels)})
            wandb.log({"loss:{rank}": sum(gartherLoss)/len(gartherLoss)})
    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

pathToFile = 'dataset.csv'
num_features = 64
embedModels = doc2vec
labels = list(range(10))
num_classes = len(labels)
batch_size = 500
chunkSize = 50000
fileSize = 500000

csvDataset = CSVDataset(pathToFile, filesize=fileSize, chunksize=chunkSize)

If I include find_unused_parameters=True as part of the DDP construction, I get the following error:

RuntimeError: Expected to mark a variable ready only once. This error is caused by the following reasons: 1) Use of a module parameter outside the forward function. Please make sure model parameters are not shared across multiple concurrent forward-backward passes2) Reused parameters in multiple reentrant backward passes. For example, if you use multiple checkpoint functions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases yet.

Looking this error up, it seems that setting find_unused_parameters=True causes it to occur. If I run this code non-distributed, it works fine. I have tried to run the following code between my loss.backward() and optimizer.step()

for name, param in model.named_parameters():
    if param.grad is None:
        print(name)

but this does not print anything. I am not sure what I am missing.

Looks like you’re calling forward multiple times before calling backwards once. This is unlikely to produce the result you want as the grads will be overwritten as you call them.

If you’re trying to implement grad accumulation, take a look at: DistributedDataParallel — PyTorch master documentation