When to call DataLoaders for DistributedDataParallel

Hello

I have a single node, with 8 GPUs, and am interested in using DistributedDataParallel (DDP) to distribute data processing across GPU devices. I have two functions (I’m basing these partly off of this demo:

def main(local_world_size, local_rank, args):
    # These are the parameters used to initialize the process group
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{os.getpid()}] Initializing process group with: {env_dict}")
    dist.init_process_group(backend="nccl")
    print(
        f"[{os.getpid()}] world_size = {dist.get_world_size()}, "
        + f"rank = {dist.get_rank()}, backend={dist.get_backend()}"
    )

    # train your model
    train(args.local_world_size, args.local_rank, args)

    # Tear down the process group
    dist.destroy_process_group()

and

DATASET_MAP = {'D1': D1_Dataset,
               'D2': D2_Dataset}

def train(local_world_size, local_rank, args):

    n = torch.cuda.device_count() // local_world_size
    device_ids = list(range(local_rank * n, (local_rank + 1) * n))

    print(
        f"[{os.getpid()}] rank = {dist.get_rank()}, "
        + f"world_size = {dist.get_world_size()}, n = {n}, device_ids = {device_ids}"
    )

    root_dir = f"/mnt/efs/ml_data/{args.dataset}/"
    datasets = {subset: DATASET_MAP[args.dataset](root=root_dir,
                                                  data_subset=subset,
                                                  in_memory=False) \
                for subset in ['train','val']}

    training_sampler = DistributedSampler(datasets['train'], rank=local_rank, shuffle=True)
    loaders = {'train': DataLoader(datasets['train'],
                                   batch_size=args.batch_size,
                                   num_workers=4,
                                   sampler=training_sampler),
               'val': DataLoader(datasets['val'],
                                 batch_size=args.batch_size,
                                 num_workers=4)}

    model = networks.Network(
        raw_dim=args.num_classes
        hidden_dim=args.hidden_dim,
        output_size=args.ouput_dim
    )
    model = model.cuda(device_ids[0])
    ddp_model = DDP(model, device_ids)

    for epoch in torch.arange(0, args.num_epochs):
        print(f"Epoch: {epoch}.")
        for i, graph in enumerate(loaders['train']):

            graph = graph.cuda(local_rank)
            output = ddp_model(graph)
            print(output.device)

and finally

if __name__ == "__main__":

    args = parse_args()

    main(args.local_world_size, args.local_rank, args)

My training and validation datasets are quite large, so I am loading batches at call time (when iterating over the DataLoader items), and not up front when creating the custom Dataset objects. The in_memory = False parameter of the custom Dataset class ensures that __getitem__ method of the Dataset class does the loading, and not the __init__ method.

The downside to the above approach is that we’ll redundantly load samples every epoch. I am wondering at which point in the distributed training this should occur? Should I create the datasets in main? or should I continue to do so in train? Because the train / val datasets are large, I want to make sure I am not putting each whole dataset onto each individual GPU device.

Does this make sense?

Thanks for your help.

Kristian

I have found this thread with comment where @ptrblck suggests to load the data first in CPU memory, and then at batch time, change the batch device ID to to GPU. How can I go about doing this within the DDP framework?

Thanks.

Kristian

You can create the Dataset with its corresponding DistributedSampler in each rank in the train method and just iterate it. Once the samples are loaded you can directly push it to the GPU using the same rank argument.

2 Likes

Hi @ptrblck

Thanks for the info – how would that be different from the approach above?

training_sampler = DistributedSampler(datasets['train'], rank=local_rank, shuffle=True)
loaders = {'train': DataLoader(datasets['train'],
                               batch_size=args.batch_size,
                               num_workers=4,
                               sampler=training_sampler)}
.
.
.
for epoch in torch.arange(0, args.num_epochs):
        print(f"Epoch: {epoch}.")
        for i, graph in enumerate(loaders['train']):

            graph = graph.cuda(local_rank)

Does this approach seem correct?

Thanks!

Kristian

In the command line, while running, the current formatted script generates the following output:

Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
*****************************************
[349474] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[349467] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '0', 'WORLD_SIZE': '8'}
[349470] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '3', 'WORLD_SIZE': '8'}
[349471] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[349472] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[349468] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '1', 'WORLD_SIZE': '8'}
[349469] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '2', 'WORLD_SIZE': '8'}
[349473] Initializing process group with: {'MASTER_ADDR': '127.0.0.1', 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[349470] world_size = 8, rank = 3, backend=nccl
[349474] world_size = 8, rank = 7, backend=nccl
[349470] rank = 3, world_size = 8, n = 1, device_ids = [3]
[349474] rank = 7, world_size = 8, n = 1, device_ids = [7]
[349468] world_size = 8, rank = 1, backend=nccl
[349468] rank = 1, world_size = 8, n = 1, device_ids = [1]
[349467] world_size = 8, rank = 0, backend=nccl
[349467] rank = 0, world_size = 8, n = 1, device_ids = [0]
[349469] world_size = 8, rank = 2, backend=nccl[349473] world_size = 8, rank = 6, backend=nccl

[349469] rank = 2, world_size = 8, n = 1, device_ids = [2]
[349473] rank = 6, world_size = 8, n = 1, device_ids = [6]
[349471] world_size = 8, rank = 4, backend=nccl
[349472] world_size = 8, rank = 5, backend=nccl
[349471] rank = 4, world_size = 8, n = 1, device_ids = [4]
[349472] rank = 5, world_size = 8, n = 1, device_ids = [5]
Loaded 200 out of 200 samples in 26.96 seconds.
Loaded 200 out of 200 samples in 26.99 seconds.
Loaded 200 out of 200 samples in 27.09 seconds.
Loaded 200 out of 200 samples in 27.12 seconds.
Loaded 200 out of 200 samples in 27.19 seconds.
Loaded 200 out of 200 samples in 27.31 seconds.
Loaded 200 out of 200 samples in 27.77 seconds.
Loaded 200 out of 200 samples in 28.12 seconds.

Am I correct in concluding that it seems that each GPU device is loading the entire dataset? Is there a way to load the dataset once in the main process and then pass the dataset to each device? Or would this not really add a performance benefit? In the custom Dataset class, I’m now loading each data sample to ‘cpu’ in the __init__ and the __getitem__ methods via the data.cpu() call, and then moving each batch to a cuda device via batch.cuda() when iterating over the data loader.

Any help is appreciated.

Thanks.

Kristian

The way you use it looks good to me. Just one quick thing here.
Have you specify num_replicas in the data loader?

Hi @fduwjj

I havn’t explicitly specified this parameter in the data loader. From looking at this documentation, it seems that if num_replicas is not specified, then the num_replicas is determined internally from the distributed group size. At run time, I specify the following command line arguments:

command="python -m 'torch.distributed.launch'"
command+=" --nnode=${NNODE}"
command+=" --node_rank=${NODE_RANK}"
command+=" --nproc_per_node=${NPROC_PER_NODE}"
...
...
...
eval ${command}

where NNODE=1, NODE_RANK=0, and NPROC_PER_NODE=8 – that is, we have one machine with 8 graphics cards. Does this seem like a reasonable approach or should I define the num_replicas parameter in the data loader?

Thanks.

Kristian

Got it. Then it should work? cc: @VitalyFedyunin