Hi there,
I am trying to use PyTorch DDP for the first time, but facing this error: RuntimeError: stack expects each tensor to be equal size, but got [54] at entry 0 and [126] at entry 1
.
When I used a toy example provided on the PyTorch documentation it worked perfectly. But When I introduced my own dataset, using DistributedSampler as provided, It introduced this error.
class Img2MML_dataset(Dataset):
def __init__(self, dataframe, vocab, tokenizer):
self.dataframe = dataframe
for l in range(len(self.dataframe)):
eqn = self.dataframe.iloc[l, 1]
indexed_eqn = []
for token in tokenizer(eqn):
if token in vocab.keys(): # vocab[token] != None:
indexed_eqn.append(vocab[token])
else:
indexed_eqn.append(vocab['<unk>'])
self.dataframe.iloc[l, 1] = torch.Tensor(indexed_eqn)
def __len__(self):
return len(self.dataframe)
def __getitem__(self, index):
return self.dataframe.iloc[index, 0], self.dataframe.iloc[index, 1]
def get_loaders(rank, world_size):#, train_data, test_data, val_data):
# initializing pad collate class
#mypadcollate = My_pad_collate(rank)
# define tokenizer function
tokenizer = lambda x: x.split()
# loading train, test, and vocab preprocessed data and vocab
vocab = json.load(open('data/vocab_stoi.json', 'r'))
train_data = pd.read_csv('data/train_i2s.csv')
#print('BATCH SIZE: ',config["batch_size"])
# initailizing class Img2MML_dataset: train dataloader
imml_train = Img2MML_dataset(train_data,
vocab,
tokenizer)
''' FOR DDP '''
# if ddp:
# Create distributed sampler pinned to rank
train_sampler = DistributedSampler(imml_train,
num_replicas=world_size,
rank=rank,
shuffle=True, # May be True
seed=42)
# creating dataloader
train_dataloader = DataLoader(imml_train,
sampler=train_sampler,
batch_size=62,#config["batch_size"],
num_workers=0,#config["num_workers"],
shuffle=False,
#collate_fn=mypadcollate,
pin_memory=True)
return (train_dataloader)
def dataloaders( rank, world_size):
print('getting dataloaders...')
train_dataloader = get_loaders(rank, world_size)
print('train_dataloader shape: ', len(train_dataloader))
i,m = next(iter(train_dataloader))
I didn’t even call this data loader in the model itself. It starts throwing errors at i,m = next(iter(train_dataloader))
.
Here is the complete script:
import os, json
import pandas as pd
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import Dataset, DataLoader, DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
import torch
from torch.utils.data import Dataset
class Img2MML_dataset(Dataset):
def __init__(self, dataframe, vocab, tokenizer):
self.dataframe = dataframe
for l in range(len(self.dataframe)):
eqn = self.dataframe.iloc[l, 1]
indexed_eqn = []
for token in tokenizer(eqn):
if token in vocab.keys(): # vocab[token] != None:
indexed_eqn.append(vocab[token])
else:
indexed_eqn.append(vocab['<unk>'])
self.dataframe.iloc[l, 1] = torch.Tensor(indexed_eqn)
def __len__(self):
return len(self.dataframe)
def __getitem__(self, index):
return self.dataframe.iloc[index, 0], self.dataframe.iloc[index, 1]
def get_loaders(rank, world_size):#, train_data, test_data, val_data):
# initializing pad collate class
#mypadcollate = My_pad_collate(rank)
# define tokenizer function
tokenizer = lambda x: x.split()
# loading train, test, and vocab preprocessed data and vocab
vocab = json.load(open('data/vocab_stoi.json', 'r'))
train_data = pd.read_csv('data/train_i2s.csv')
#print('BATCH SIZE: ',config["batch_size"])
# initailizing class Img2MML_dataset: train dataloader
imml_train = Img2MML_dataset(train_data,
vocab,
tokenizer)
''' FOR DDP '''
# if ddp:
# Create distributed sampler pinned to rank
train_sampler = DistributedSampler(imml_train,
num_replicas=world_size,
rank=rank,
shuffle=True, # May be True
seed=42)
# creating dataloader
train_dataloader = DataLoader(imml_train,
sampler=train_sampler,
batch_size=62,#config["batch_size"],
num_workers=0,#config["num_workers"],
shuffle=False,
#collate_fn=mypadcollate,
pin_memory=True)
return (train_dataloader)
def dataloaders( rank, world_size):
print('getting dataloaders...')
train_dataloader = get_loaders(rank, world_size)
print('train_dataloader shape: ', len(train_dataloader))
i,m = next(iter(train_dataloader))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
dataloaders(rank, world_size)
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)