Using DDP With A Multi-Task Model

On a single GPU, I’m able to train a multi-task model like this:

model = MultiTaskBERT()
model.to(device)
data_task_A = iter(get_data_loader())
data_task_B = iter(get_data_loader())
max_steps = max(len(data_task_A), len(data_task_B))
task_probs = [0.5, 0.5]

for epoch in range(num_epochs):
    for step in range(max_steps):
        #Choose one of the tasks with a probability
        data_current = random.choice([data_task_A, data_task_B], p=task_probs)
        if step < len(data_current):
            batch = next(data_current)
            train(model, current_batch)

The block above converts DataLoaders to iterators so that I can alternate between batches of the different tasks.

I tried to modify the script so that I could use DDP (single node, multi-GPU)

from transformers.data.data_collator import default_data_collator

model = MultiTaskBERT()
model.to(device)
model = DDP(model, device_ids=[gpu_idx], find_unused_parameters=True)

ds_A = get_dataset()
ds_B = get_dataset()
sampler_A = DistributedSampler(ds_A, num_replicas=world_size, rank=rank)
sampler_B = DistributedSampler(ds_B, num_replicas=world_size, rank=rank)
data_task_A = iter(DataLoader(
                    ds_A,
                    batch_size=batch_size,
                    collate_fn=default_data_collator,
                    sampler=sampler_A
                ))
data_task_B = iter(DataLoader(
                    ds_B,
                    batch_size=batch_size,
                    collate_fn=default_data_collator,
                    sampler=sampler_B
                ))
max_steps = max(len(data_task_A), len(data_task_B))
task_probs = [0.5, 0.5]

for epoch in range(num_epochs):
    for step in range(max_steps):
        #Choose one of the tasks with a probability
        data_current = random.choice([data_task_A, data_task_B], p=task_probs)
        if step < len(data_current):
            batch = next(data_current)
            train(model, current_batch)

I was able to train error free, but noticed that my accuracy scores were much worse than single GPU. I began debugging by trying to train one task at a time, and comparing to single GPU. On a single GPU I am able to use the code from the first block to train a single task by simply setting the probabilities for all tasks to 0, except for the one I want to train (which I set to 1).

The code below more-or-less matches my single task, single GPU results. The problem disappears when I enumerate my DataLoader, instead of converting it to an iterator.

from transformers.data.data_collator import default_data_collator

model = MultiTaskBERT()
model.to(device)
model = DDP(model, device_ids=[gpu_idx], find_unused_parameters=True)

ds_A = get_dataset()
sampler_A = DistributedSampler(ds_A, num_replicas=world_size, rank=rank)
data_task_A = DataLoader(
                    ds_A,
                    batch_size=batch_size,
                    collate_fn=default_data_collator,
                    sampler=sampler_A
                )

for epoch in range(num_epochs):
    for i, current_batch in enumerate(data_task_A):
        train(model, current_batch)

My question is: How can I alternate between batches from the different tasks, in a DDP setting? It appears that converting my DataLoader to an iterator messes with the distributed sampling…should that be happening?

Thanks!

If the dataset that you are using is an IterableDataset then I don’t believe that converting the DataLoader for that dataset should be messing up distributed sampling. Since it looks like it is working when using one dataloader for you, isn’t a possible workaround to try combining the two different tasks into one custom data loader and implement your own __next__() with the task probabilities.

I used an IterableDataset and got

ValueError: DataLoader with IterableDataset: expected unspecified sampler option, but got sampler=<torch.utils.data.distributed.DistributedSampler object at 0x7fab6f18e898>

That workaround idea sounds good, I will try that and post back.