I am trying to run Pytorch DDP and spawning the model and dataset. I have tried transferring model, source, and targets to CUDA but only source and model and actually transferred to CUDA while targets are on CPU. I tried everything I know but this error persists. I have checked: the targets are tensors with appropriate shapes, used.to(device), etc… The code is given below for reference. Thank you! Any suggestions will be helpful.
# -*- coding: utf-8 -*-
import torch, json
import numpy as np
from preprocessing.preprocess_dataloader import preprocess
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.optim as optim
import torch.nn as nn
from utils import save_checkpoint, load_checkpoint, \
define_model, init_weights, get_loaders
# read config file
config = json.load(open('config.json', 'r'))
def train_ddp(model, device, train_dataloader, optimizer, criterion):
model.train() # train mode is ON i.e. dropout and normalization tech. will be used
epoch_loss = 0
trg_seqs = open('logs/train_targets.txt', 'w')
pred_seqs = open('logs/train_predicted.txt', 'w')
vocab = json.load(open('data/vocab_stoi.json', 'r'))
#print('train_dataloader shape: ', len(train_dataloader))
#i,m = next(iter(train_dataloader))#.next()
#print('++++++++++=======++++++++++'*3)
print(device) # 0, 1
#print(m.shape)
#print('++++++++++=======++++++++++'*3)
for (img, mml) in train_dataloader:
#img, mml = data[0], data[1]
trg = mml
#trg=trg.permute(1,0) # trg: [len, B] --> [B, len]
print(trg.shape) # [64, 150]. where 64 is Batch size, and 150 is seq_len
trg.to(device)#, dtype=torch.int64)
print('trg on: ', trg.device) # cpu
print('trg is tensor?: ', torch.is_tensor(trg)) # True
batch_size = trg.shape[0]
img_arr=[]
for i in img:
img_arr.append(torch.load(f'data/image_tensors/{i}.txt'))
img_stack=torch.stack(img_arr, dim=0)
src = img_stack.to(device)
print('src on: ', src.device) # cuda:0 and cuda:1
print(src.shape) # [64, 3, 41, 266]
# setting gradients to zero
optimizer.zero_grad()
#output, pred, encoder, decoder = model(src, trg, vocab, True, True, 0.5)
output, pred = model(src, trg, vocab, False, True, 0.5)
# output, pred, encoder, decoder = model( tdi, vocab, True, True, 0.5 )
pred = pred.permute(1,0) # [B, len ]--> [len, B]
output = output.permute(1,0,2)
# translating and storing trg and pred sequences in batches
if write_file:
batch_size = trg.shape[1]
for idx in range(batch_size):
trg_arr = [vocab.itos[itrg] for itrg in trg[:,idx]]
trg_seq = " ".join(trg_arr)
trg_seqs.write(trg_seq + '\n')
pred_arr = [vocab.itos[ipred] for ipred in pred.int()[:,idx]]
pred_seq = " ".join(pred_arr)
pred_seqs.write(pred_seq+'\n')
#trg = [trg len, batch size]
trg = trg.permute(1,0)
#output = [B, trg len, output dim] --> [len, B, out]
output_dim = output.shape[-1]
output = output[1:].contiguous().view(-1, output_dim)
trg = trg[1:].view(-1)
loss = criterion(output, trg.to(torch.int64))
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
net_loss = epoch_loss/len(train_dataloader)
return net_loss
def train_main(rank,world_size):
# (world_size, train_data, test_data, val_data, vocab)=args
print('getting dataloaders...')
loaders = get_loaders(rank, world_size)#,
# train_data, test_data, val_data)
# loaders = preprocess(rank,
# world_size,
# config["batch_size"],
# config["device"])
# unpack loaders_vocab
train_dataloader, test_dataloader, val_dataloader = loaders
model = define_model().to(rank)
print('model on: ', model.device) # 0, 1
print('MODEL: ')
#print(model.apply(init_weights))
ddp_model = DDP(model, device_ids=[rank])
# optimizer and loss
TRG_PAD_IDX = 0 # index of <pad> token in vocab file
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)
best_val_loss = np.inf
for epoch in range(config["epochs"]):
train_loss = train_ddp(ddp_model, rank, train_dataloader, optimizer, criterion)
print(train_loss)
The main function is:
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from utils import set_random_seed, define_model, split_and_create_vocab
from preprocessing.preprocess_dataloader import preprocess
from torch.nn.parallel import DistributedDataParallel as DDP
from train import train_main
from preprocessing.dataset import Img2MML_dataset
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12358'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def demo_basic(rank,world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
train_main(rank, world_size)
cleanup()
def run_demo(demo_fn, world_size):
# create folders
FOLDER = ['trained_models', 'logs']
for f in FOLDER:
if not os.path.exists(f):
os.mkdir(f)
# split dataset and create vocab before spawing
split_and_create_vocab()
print('going for spawn...')
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__ == "__main__":
# set seed
set_random_seed()
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)