I am attempting to use DistributedDataParallel for single-node, multi-GPU training in a SageMaker Studio multi-GPU instance environment, within a Docker container. My entry code is as follows:
import os
from PIL import ImageFile
import torch.multiprocessing as mp
nodes, gpus = 1, 4
world_size = nodes * gpus
# set environment variables for distributed training
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
# workaround for an issue with the data
ImageFile.LOAD_TRUNCATED_IMAGES = True
# a PyTorch Dataset object which loads pairs of images for contrastive learning
# I have tried both pre-loading and lazy-loading of images, same results either way
train_ds = get_training_dataset()
# this is an nn.Module which is not (yet) wrapped in DistributedDataParallel
# model is relatively complex but does not seem to be related to the issue
# so I have cut excess code here. if necessary I can provide some more detail
model = get_model()
# "fork" as I read it works better in notebooks; "spawn" was giving an error
# "gloo" over "nccl" as the backend because NCCL was giving an error
# "veth-app0-2" as interface name was obtained from running 'ifconfig'
mp.start_processes(
training_worker_func,
nprocs=gpus,
args=(nr, gpus, world_size, train_ds, model, epochs, lr, momentum,
weight_decay, batch_size, "gloo", "veth-app0-2"),
join=True,
start_method="fork"
)
The training function is then:
import os
import torch
import torch.distributed as dist
from torch import nn
from torch.optim import SGD
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
def training_worker_func(gpu, nr, gpus, world_size, train_ds, model, epochs, lr, momentum,
weight_decay, batch_size, distributed_backend, network_ifname):
# compute overall rank from node rank, GPUs, and current GPU
rank = nr * gpus + gpu
# set network interface environment variable
# without this, a warning shows about being unable to detect interface and using a fallback
# but the results are the same other than the warning
os.environ[distributed_backend.upper() + "_SOCKET_IFNAME"] = network_ifname
# initialise the process group for distributed processing
dist.init_process_group(
backend=distributed_backend,
init_method="env://",
world_size=world_size,
rank=rank,
)
# set CUDA to the correct GPU
torch.cuda.set_device(gpu)
# switch model to CUDA
# I have also tried deep-copying the model before calling cuda(), same result
model.cuda(gpu)
# convert model to DistributedDataParallel
model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
# initialise loss function
criterion = nn.CrossEntropyLoss().cuda(gpu)
# initialise SGD as the optimiser
opt = SGD(model.parameters(), lr, momentum=momentum, weight_decay=weight_decay)
# use a distributed sampler for training
sampler = DistributedSampler(train_ds, num_replicas=world_size, rank=rank)
# create the training DataLoader using the sampler
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=False, num_workers=0,
pin_memory=True, sampler=sampler)
# begin the training process
for epoch in range(epochs):
loss_weights = [0.1, 0.4, 0.7, 1.0]
sum_total_loss = 0.0
# switch to train mode
model.train()
print(f"GPU {gpu} beginning epoch {epoch} of {epochs}...") # this prints for all 4 processes
for j, images in enumerate(train_dl):
print(f"GPU {gpu}, epoch {epoch}: {j} / {len(train_dl)}") # this never prints for any process
images[0] = images[0].cuda(non_blocking=True)
images[1] = images[1].cuda(non_blocking=True)
# compute output
output, target = model(im_q=images[0], im_k=images[1])
# compute loss
losses = [criterion(output[:,i,:], target[:,i]) for i in range(12)]
total_loss = sum(l * loss_weights[i%4] for i, l in enumerate(losses))
sum_total_loss += total_loss
# compute gradient and do SGD step
opt.zero_grad()
total_loss.backward()
opt.step()
if gpu == 0:
# only print loss if this is the main worker
print(f"Epoch {epoch + 1} training loss: {sum_total_loss / len(train_dl)}")
if gpu == 0:
# only print if this is the main worker
print("Training complete!")
The result of this code is that the 4 processes are spawned without error, the message “GPU … beginning epoch 0 of 1…” prints for all 4 processes, but then none of the 4 processes reach the next print statement. There is no error, the process just hangs.
The images all seem to be successfully loaded into the instance memory, as a good chunk of its memory is utilised. I don’t think the problem is data-specific as I can iterate through the same data locally no problem.
So it seems like there is some issue in the distributed processes where the data loaded onto the CPU is not being shared? I am new to the distributed training field so I am unsure exactly how this works internally. Any help or guidance would be appreciated.