Hello, I am trying to make my workflow run on multiple GPUs. Since torch.nn.DataParallel did not work out for me (see this discussion), I am now trying to go with torch.nn.parallel.DistributedDataParallel
(DDP). However I am not sure how to use the tensorboard logger when doing distributed training. Previous questions about this topic remain unanswered: (here or here).
I have set up a typical training workflow that runs fine without DDP (use_distributed_training=False
) but fails when using it with the error: TypeError: cannot pickle '_io.BufferedWriter' object
.
Is there any way to make this code run, using both tensorboard and DDP?
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from tensorboardX import SummaryWriter
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.distributions import Laplace
class ToyNet(nn.Module):
def __init__(self):
super().__init__()
self.dens1 = nn.Linear(in_features=16, out_features=3)
def forward(self, x):
x = self.dens1(x)
x = Laplace(x, torch.tensor([1.0]))
return x
class RandomDataset(Dataset):
def __init__(self):
pass
def __getitem__(self, index):
sample = {'mod1': torch.rand(1, 16).float(),
'mod2': torch.rand(1, 16).float(),
'mod3': torch.rand(1, 16).float()}
label = torch.randint(0, 1, (3,)).float()
return sample, label
def __len__(self):
return 20
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
class Experiment:
def __init__(self, distributed, dir_logs):
# initialize summary writer
if not os.path.exists(dir_logs):
os.makedirs(dir_logs)
self.logger = SummaryWriter(dir_logs)
self.model = ToyNet()
self.rank = None
self.distributed = distributed
if distributed:
self.world_size = torch.cuda.device_count()
assert self.world_size > 1, 'More than 1 GPU need to be accessible to use distributed training'
else:
self.world_size = 1
def train(exp: Experiment):
rank = exp.rank
if exp.distributed:
model = DDP(exp.model, device_ids=[rank])
sampler = DistributedSampler(RandomDataset(), num_replicas=exp.world_size, rank=rank)
else:
model = exp.model.to(rank)
sampler = None
rand_loader = DataLoader(dataset=RandomDataset(),
batch_size=8, shuffle=False, pin_memory=True, sampler=sampler, num_workers=0)
mse_loss = nn.MSELoss()
for step, (batch, label) in enumerate(rand_loader):
for modality in batch.keys():
label = label.to(rank)
batch = {k: v.to(rank) for k, v in batch.items()}
output = model(batch[modality]).mean
loss = mse_loss(output, label)
exp.logger.add_scalars(f'train/loss',
{'train_loss': loss.item()},
step)
def validate(exp):
model = exp.model.eval()
rank = exp.rank
with torch.no_grad():
if exp.distributed:
sampler = DistributedSampler(RandomDataset(), num_replicas=exp.world_size, rank=rank)
else:
sampler = None
rand_loader = DataLoader(dataset=RandomDataset(),
batch_size=8, shuffle=False, pin_memory=True, sampler=sampler, num_workers=0)
mse_loss = nn.MSELoss()
for step, (batch, label) in enumerate(rand_loader):
for modality in batch.keys():
label = label.to(rank)
batch = {k: v.to(rank) for k, v in batch.items()}
output = model(batch[modality]).mean
loss = mse_loss(output, label)
exp.logger.add_scalars(f'val/loss',
{'val_loss': loss.item()},
step)
def run_epochs(rank, exp: Experiment):
print(f"Running basic DDP example on rank {rank}.")
exp.rank = rank
if exp.distributed:
setup(rank, exp.world_size)
for epoch in range(5):
train(exp)
validate(exp)
if exp.distributed:
cleanup()
print('done!')
if __name__ == '__main__':
log_dir = 'temp_dir'
use_distributed_training = True
ex = Experiment(use_distributed_training, log_dir)
if ex.distributed:
mp.spawn(run_epochs,
args=(ex,),
nprocs=ex.world_size,
join=True)
else:
run_epochs(torch.device('cuda'), ex)