Under what circumstances, DistributedDataParallel() doesn't provide reduction in training time?

i have write a simple net to test DistributedDataParallel() ,but

CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 test_bert.py

with two gpu doesn’t use less time. So Under what circumstances, DistributedDataParallel() doesn’t work.Or is there something wrong with my code?

import torch.nn as nn
import transformers
import config
import pandas as pd
from sklearn.utils import shuffle
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
import torch
from tqdm import tqdm
import os
LOCAL_RANK = int(os.getenv('LOCAL_RANK', -1))  # https://pytorch.org/docs/stable/elastic/run.html
RANK = int(os.getenv('RANK', -1))
WORLD_SIZE = int(os.getenv('WORLD_SIZE', 1))
class Dataset:
    def __init__(self, review, target):
        self.review = review
        self.target = target
        self.tokenizer = transformers.BertTokenizer.from_pretrained(config.BERT_PATH)
        self.max_len = 512

    def __len__(self):
        return len(self.review)

    def __getitem__(self, item):
        review = str(self.review[item])
        review = " ".join(review.split())

        inputs = self.tokenizer.encode_plus(
            review,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True
        )

        ids = inputs["input_ids"]
        mask = inputs["attention_mask"]
        token_type_ids = inputs["token_type_ids"]

        return {
            "ids": torch.tensor(ids, dtype=torch.float),
            "attention_mask": torch.tensor(mask, dtype=torch.float),
            "token_type_ids": torch.tensor(token_type_ids, dtype=torch.float),
            "review":  self.review[item],
            "targets": torch.tensor(self.target[item], dtype=torch.float),
        }
def loss_fn(outputs, targets):
    BCEcls = nn.BCEWithLogitsLoss().to(LOCAL_RANK)
    return BCEcls(outputs, targets.view(-1,1))

class DKANET(nn.Module):
    def __init__(self,concept_emd_glove=None,):
        super(DKANET, self).__init__()
        self.bert = transformers.BertModel.from_pretrained(config.BERT_PATH)
        self.out = nn.Linear(768, 1)
    def forward(self,sentence):
        outputs  = self.bert(input_ids=sentence['ids'],
                  attention_mask=sentence['attention_mask'],
                  token_type_ids=sentence['token_type_ids'])
        o = self.out(outputs.pooler_output)
        return o
def train(data_loader, model, optmizer):
    model.train()
    pbar = enumerate(data_loader)
    if RANK in [-1, 0]:
        pbar = tqdm(pbar, total=len(data_loader))
    for index, d in pbar:

        ids = d["ids"]
        attention_mask = d['attention_mask']
        token_type_ids = d['token_type_ids']
        target = d['targets']

        ids = ids.to(LOCAL_RANK, dtype=torch.long)
        attention_mask = attention_mask.to(LOCAL_RANK, dtype=torch.long)
        token_type_ids = token_type_ids.to(LOCAL_RANK, dtype=torch.long)
        target = target.to(LOCAL_RANK, dtype=torch.float)
        sentence = {
            "ids": ids,
            "attention_mask":attention_mask,
            "token_type_ids" : token_type_ids,
        }

        optmizer.zero_grad()
        outputs = model(sentence)
        loss = loss_fn(outputs, target)
        if RANK in [-1, 0]:
            pbar.set_description(f'loss{loss},sent:{format(loss,".3f")}')
        loss.backward()
        optmizer.step()
def run():
    torch.cuda.set_device(LOCAL_RANK)
    dist.init_process_group(backend="nccl" if dist.is_nccl_available() else "gloo")
    path_d = 'dataset/books/books.csv'
    train_s = pd.read_csv(path_d)
    train_s = shuffle(train_s)
    train_s = train_s.reset_index(drop=True)
    train_data = Dataset(train_s.review.values, train_s.sentiment.values)
    datasampler_train = DistributedSampler(train_data, num_replicas=2, rank=0)
    train_data = torch.utils.data.DataLoader(
        train_data, num_workers=16,batch_size=config.TRAIN_BATCH_SIZE,sampler=datasampler_train)
    model = DKANET().to(LOCAL_RANK)
    model = nn.parallel.DistributedDataParallel(model,device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)

    #### optimizer initializer
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
    #######################
    for epoch in range(config.EPOCHS):
        train(train_data, model, optimizer)
def main():
    run()
    if WORLD_SIZE > 1 and RANK == 0:
        _ = [print('Destroying process group... ', end=''), dist.destroy_process_group(), print('Done.')]
if __name__=='__main__':
    main()

Strangely, nn.DataParallel() worked for me.

Hi, one issue I see is that you are spawning 16 data reading workers on each distributed process for a total of 32 processes just for data reading, do you need that many workers? This might be creating too many processes on your machine resulting in trashing/slowdown. Could you try setting num_workers=0 and comparing the training speed of local vs distributed (ensuring the distributed training splits the data, which it appears it does correctly in the script)?

I did a couple of experiments,it seems that more gpu more time, and setting num_workers=0 doesn’t work.
batch 8
gpus 2 num_worker 16 01:57
gpus 2 num_worker 4 01:56
gpus 2 num_worker 0 02:07

batch 8
gpus 1 num_worker 4 01:32
gpus 1 num_worker 0 01:43

Additoinally, “gpus 2 num_worker 16 01:57” means setting num_workers=16, using 2 V100, costs 01:57min.