I’ve been reading the documents official provided these days about distributed training. I tried to use mp.spawn
and torch.distributed.launch
to start training. I found that using mp.spawn
is slower than torch.distributed.launch
, mainly in the early stage of each epoch data read.
For example, when using torch.distributed.launch
, it only takes 8 seconds to train every epoch. When using mp.spawn
, it takes 17 seconds to train every epoch, of which the first 9 seconds have been waiting (GPU util is 0%).
And I found that when using torch.distributed.launch
, I can see multiple processes through ps -ef | grep train_multi
instruction, but when I use mp.spawn
, I can only see one processes.
I don’t know if I use it incorrectly. I hope I can get your advice. Looking forward to your reply.
environments:
OS: Centos7
Python: 3.6
Pytorch: 1.7 GPU
CUDA: 10.1
GPU: Tesla V100
these code are from https://github.com/WZMIAOMIAO/deep-learning-for-image-processing/blob/master/pytorch_classification/train_multi_GPU/train_multi_gpu_using_spawn.py
import os
import math
import tempfile
import argparse
import torch
import torch.multiprocessing as mp
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from model import resnet34
from my_dataset import MyDataSet
from utils import read_split_data, plot_data_loader_image
from multi_train_utils.distributed_utils import dist, cleanup
from multi_train_utils.train_eval_utils import train_one_epoch, evaluate
def main_fun(rank, world_size, args):
if torch.cuda.is_available() is False:
raise EnvironmentError("not find GPU device for training.")
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
args.rank = rank
args.world_size = world_size
args.gpu = rank
args.distributed = True
torch.cuda.set_device(args.gpu)
args.dist_backend = 'nccl'
print('| distributed init (rank {}): {}'.format(
args.rank, args.dist_url), flush=True)
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
dist.barrier()
rank = args.rank
device = torch.device(args.device)
batch_size = args.batch_size
num_classes = args.num_classes
weights_path = args.weights
args.lr *= args.world_size
if rank == 0:
print(args)
print('Start Tensorboard with "tensorboard --logdir=runs", view at http://localhost:6006/')
tb_writer = SummaryWriter()
if os.path.exists("./weights") is False:
os.makedirs("./weights")
train_images_path, train_images_label, val_images_path, val_images_label = read_split_data(args.data_path)
data_transform = {
"train": transforms.Compose([transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
"val": transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}
train_data_set = MyDataSet(images_path=train_images_path,
images_class=train_images_label,
transform=data_transform["train"])
val_data_set = MyDataSet(images_path=val_images_path,
images_class=val_images_label,
transform=data_transform["val"])
train_sampler = torch.utils.data.distributed.DistributedSampler(train_data_set)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_data_set)
train_batch_sampler = torch.utils.data.BatchSampler(
train_sampler, batch_size, drop_last=True)
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
if rank == 0:
print('Using {} dataloader workers every process'.format(nw))
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_sampler=train_batch_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=train_data_set.collate_fn)
val_loader = torch.utils.data.DataLoader(val_data_set,
batch_size=batch_size,
sampler=val_sampler,
pin_memory=True,
num_workers=nw,
collate_fn=val_data_set.collate_fn)
model = resnet34(num_classes=num_classes).to(device)
if os.path.exists(weights_path):
weights_dict = torch.load(weights_path, map_location=device)
load_weights_dict = {k: v for k, v in weights_dict.items()
if model.state_dict()[k].numel() == v.numel()}
model.load_state_dict(load_weights_dict, strict=False)
else:
checkpoint_path = os.path.join(tempfile.gettempdir(), "initial_weights.pt")
if rank == 0:
torch.save(model.state_dict(), checkpoint_path)
dist.barrier()
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
if args.freeze_layers:
for name, para in model.named_parameters():
if "fc" not in name:
para.requires_grad_(False)
else:
if args.syncBN:
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
# optimizer
pg = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(pg, lr=args.lr, momentum=0.9, weight_decay=0.005)
# Scheduler https://arxiv.org/pdf/1812.01187.pdf
lf = lambda x: ((1 + math.cos(x * math.pi / args.epochs)) / 2) * (1 - args.lrf) + args.lrf # cosine
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lf)
for epoch in range(args.epochs):
train_sampler.set_epoch(epoch)
mean_loss = train_one_epoch(model=model,
optimizer=optimizer,
data_loader=train_loader,
device=device,
epoch=epoch)
scheduler.step()
sum_num = evaluate(model=model,
data_loader=val_loader,
device=device)
acc = sum_num / val_sampler.total_size
if rank == 0:
print("[epoch {}] accuracy: {}".format(epoch, round(acc, 3)))
tags = ["loss", "accuracy", "learning_rate"]
tb_writer.add_scalar(tags[0], mean_loss, epoch)
tb_writer.add_scalar(tags[1], acc, epoch)
tb_writer.add_scalar(tags[2], optimizer.param_groups[0]["lr"], epoch)
torch.save(model.state_dict(), "./weights/model-{}.pth".format(epoch))
if rank == 0:
if os.path.exists(checkpoint_path) is True:
os.remove(checkpoint_path)
cleanup()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--num_classes', type=int, default=5)
parser.add_argument('--epochs', type=int, default=30)
parser.add_argument('--batch-size', type=int, default=16)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--lrf', type=float, default=0.1)
parser.add_argument('--syncBN', type=bool, default=True)
# http://download.tensorflow.org/example_images/flower_photos.tgz
parser.add_argument('--data-path', type=str, default="/home/wz/data_set/flower_data/flower_photos")
# https://download.pytorch.org/models/resnet34-333f7ec4.pth
parser.add_argument('--weights', type=str, default='resNet34.pth',
help='initial weights path')
parser.add_argument('--freeze-layers', type=bool, default=False)
parser.add_argument('--device', default='cuda', help='device id (i.e. 0 or 0,1 or cpu)')
parser.add_argument('--world-size', default=4, type=int,
help='number of distributed processes')
parser.add_argument('--dist-url', default='env://', help='url used to set up distributed training')
opt = parser.parse_args()
mp.spawn(main_fun,
args=(opt.world_size, opt),
nprocs=opt.world_size,
join=True)