i have write a simple net to test DistributedDataParallel() ,but
CUDA_VISIBLE_DEVICES="0,1" python -m torch.distributed.launch --nproc_per_node 2 test_bert.py
with two gpu doesn’t use less time. So Under what circumstances, DistributedDataParallel() doesn’t work.Or is there something wrong with my code?
import torch.nn as nn
import transformers
import config
import pandas as pd
from sklearn.utils import shuffle
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
import torch
from tqdm import tqdm
import os
LOCAL_RANK = int(os.getenv('LOCAL_RANK', -1)) # https://pytorch.org/docs/stable/elastic/run.html
RANK = int(os.getenv('RANK', -1))
WORLD_SIZE = int(os.getenv('WORLD_SIZE', 1))
class Dataset:
def __init__(self, review, target):
self.review = review
self.target = target
self.tokenizer = transformers.BertTokenizer.from_pretrained(config.BERT_PATH)
self.max_len = 512
def __len__(self):
return len(self.review)
def __getitem__(self, item):
review = str(self.review[item])
review = " ".join(review.split())
inputs = self.tokenizer.encode_plus(
review,
None,
add_special_tokens=True,
max_length=self.max_len,
padding='max_length',
truncation=True
)
ids = inputs["input_ids"]
mask = inputs["attention_mask"]
token_type_ids = inputs["token_type_ids"]
return {
"ids": torch.tensor(ids, dtype=torch.float),
"attention_mask": torch.tensor(mask, dtype=torch.float),
"token_type_ids": torch.tensor(token_type_ids, dtype=torch.float),
"review": self.review[item],
"targets": torch.tensor(self.target[item], dtype=torch.float),
}
def loss_fn(outputs, targets):
BCEcls = nn.BCEWithLogitsLoss().to(LOCAL_RANK)
return BCEcls(outputs, targets.view(-1,1))
class DKANET(nn.Module):
def __init__(self,concept_emd_glove=None,):
super(DKANET, self).__init__()
self.bert = transformers.BertModel.from_pretrained(config.BERT_PATH)
self.out = nn.Linear(768, 1)
def forward(self,sentence):
outputs = self.bert(input_ids=sentence['ids'],
attention_mask=sentence['attention_mask'],
token_type_ids=sentence['token_type_ids'])
o = self.out(outputs.pooler_output)
return o
def train(data_loader, model, optmizer):
model.train()
pbar = enumerate(data_loader)
if RANK in [-1, 0]:
pbar = tqdm(pbar, total=len(data_loader))
for index, d in pbar:
ids = d["ids"]
attention_mask = d['attention_mask']
token_type_ids = d['token_type_ids']
target = d['targets']
ids = ids.to(LOCAL_RANK, dtype=torch.long)
attention_mask = attention_mask.to(LOCAL_RANK, dtype=torch.long)
token_type_ids = token_type_ids.to(LOCAL_RANK, dtype=torch.long)
target = target.to(LOCAL_RANK, dtype=torch.float)
sentence = {
"ids": ids,
"attention_mask":attention_mask,
"token_type_ids" : token_type_ids,
}
optmizer.zero_grad()
outputs = model(sentence)
loss = loss_fn(outputs, target)
if RANK in [-1, 0]:
pbar.set_description(f'loss{loss},sent:{format(loss,".3f")}')
loss.backward()
optmizer.step()
def run():
torch.cuda.set_device(LOCAL_RANK)
dist.init_process_group(backend="nccl" if dist.is_nccl_available() else "gloo")
path_d = 'dataset/books/books.csv'
train_s = pd.read_csv(path_d)
train_s = shuffle(train_s)
train_s = train_s.reset_index(drop=True)
train_data = Dataset(train_s.review.values, train_s.sentiment.values)
datasampler_train = DistributedSampler(train_data, num_replicas=2, rank=0)
train_data = torch.utils.data.DataLoader(
train_data, num_workers=16,batch_size=config.TRAIN_BATCH_SIZE,sampler=datasampler_train)
model = DKANET().to(LOCAL_RANK)
model = nn.parallel.DistributedDataParallel(model,device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)
#### optimizer initializer
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
#######################
for epoch in range(config.EPOCHS):
train(train_data, model, optimizer)
def main():
run()
if WORLD_SIZE > 1 and RANK == 0:
_ = [print('Destroying process group... ', end=''), dist.destroy_process_group(), print('Done.')]
if __name__=='__main__':
main()