How to parallelize a loop over the samples of a batch

Just to clarify, are you saying what i am trying to do cannot be done with the parallel.DistributedDataParallel DistributedDataParallel — PyTorch master documentation?

No,

DDP would be another alternative. But it won’t give you the flexibility of passing an arbitrary function that computes what you want on different processes. You will need to give it a nn.Module with your model. Which might not a be a restriction in your case.

1 Like

Ah, ok thanks! I appreciate the pointer to the other docs I will also check them!

My current goal is to give a chunk of batches or single examples in a list to a model (that is shared in parallel). Then do the optimizer step once all the losses have been computed (like in my example). Will have to read DDP to see how thats done there (note one of my model takes in a list of examples while the other does take a tensor). Will try to parallelize both.

——

I basically want to translate my self contained exampled to the right pytorch parallelization.

@albanD I was readint the api/docs for DistributedDataParallel — PyTorch 2.1 documentation and it says this (which worries me for my application):

This container parallelizes the application of the given module by splitting the input across the specified devices by chunking in the batch dimension

the chunking is problematic. My models do not receive thesors. They received special parsed objects that I need to process. Batching this is not easy this is why I am resorting to distributed. Let assume that I am right that batching is hard. If that is the case then how would DDP “chunk” my list of python objects? Would it work at all?

e.g.

def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    x = torch.randn(20, 10)  # imagine we change this to a list [ obj1, ..., obj_batch_size]
    outputs = ddp_model(x)
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

what would happen if x is a list of objects instead?

If you give a list, AFAIK, it will slice it and so if it is a list of 10 elements and you split that over 5 processes, each process will get 2 elements from the list.
So x[0:2] would go to the first process, x[2:4] would go to the second, etc

1 Like

cool! will test it out, thanks AlbanD!

but the above example is confusing. How does the demo_basic code actually receive a batch? Is there an example with the actual data loader? @albanD


update

perhaps what I need is an example with a dataloader…seems this page has one: Distributed data parallel training in Pytorch

Well usually, the whole block between optimizer.zero_grad() and optimizer.step() is inside a for-loop that takes samples x from a dataset to perform the training.
So this example is missing the training loop there that would be getting the batch.

@albanD is the right way to think about it then is that the ddp_model(x) is what “blocks” so that the next lines are executed with the right coordinated gradients? (the strange thing is that the demo basic code I pasted is usually the fn that a process would execute on it’s own which is weird because then how do you make sure that each process samples the same x if they have their own dataloader?..I am assuming that the code demo_basic(randk, world_size) is running is not the actual code and it get’s compiled to some other magic that makes it work properly).

Thanks for the help!

Source of example: Getting Started with Distributed Data Parallel — PyTorch Tutorials 1.7.1 documentation

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

I am really not an expert on how to use DDP but from what I understand, each process has its own dataset/dataloder and they use a distributed sampler to make sure they don’t sample the same thing: torch.utils.data — PyTorch 1.7.1 documentation

2 Likes

I know you said you are not an expert with DDP (if you know who is, can you forward my question bellow?) but when using cpus only how do I pass the ranks to the DDP? if I try to use a modify example of the basic demo, I’ve seen it complains that there are no gpu - so the examples are meant for GPU not cpu (ddp interprets rank as a gpu name or something).

In summary when I create the DDP model which of the following is correct (for multi cpu):

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

or

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()

for it to work properly in multiple CPUs?

note usually one does: # ddp_model = DDP(model, device_ids=[rank]) for GPU to move the model to gpu with that rank (at least thats what my comment says).

Whole self contained example (without dataloaders…that was my next step):

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')

How is the RPC framework different from what this tutorial shows (Writing Distributed Applications with PyTorch — PyTorch Tutorials 1.7.1 documentation)? That uses send/recv and all_reduce etc… there are so many options that this confusing and frustrating. I don’t understand why (I understand the error by why can’t it just do rpc.map or something for me and allow me to use gradients)

with Poo(100) in pool:
    losses = pool.map(forward, batch)
    torch.mean(losses).backward()
optimizer.step()

works…


rpc example: python - How does one implement parallel SGD with the pytorch autograd RPC library so that gradients can be received from different processes without errors? - Stack Overflow


My intetion was to parallelize meta-learning with torchmeta + higher but it seems that path is dead using DDP until higher is incorporated into the core of pytorch. See:

but the RPC path might not be dead:

cc @mrshenli in case he has more ideas about how these would work with DDP.

1 Like

hi @mrshenli I made a DDP example and managed to parallelize it with DDP. But I have access to 110 CPUs not gpus. But the single process code is faster than the 10 and 110 cpu process code. Why is that? Is there something I can do to fix this?

Hey @Brando_Miranda @albanD sorry for being late to this discussion.

I made a DDP example and managed to parallelize it with DDP. But I have access to 110 CPUs not gpus. But the single process code is faster than the 10 and 110 cpu process code. Why is that? Is there something I can do to fix this?

Are those CPUs/machines or CPU cores? IIRC, PyTorch operators already parallelizes across multiple CPU cores (@albanD please correct me if I was wrong).

If its multiple cores on the same machine, how did you make sure that each DDP process exclusively operates on a set of CPU cores?

Another thing is that, it might be DDP’s CPU communication overhead is overshadowing the compute parallelization speedup. The gradient synchronization comm overhead is roughly constant, and independent to the batch size. One helpful exercise might try to increase the batch size and see the gap shrinks.

Just wanna confirm when using DDP on 10 CPUs, did you set the per process batch-size to 1/10 compared to local training?

I assume you have already figured this out. :slight_smile: Responding here for future users. This can be done by setting the device_ids to None or empty list. Quote from the API doc:

How is the RPC framework different from what this tutorial shows (Writing Distributed Applications with PyTorch — PyTorch Tutorials 1.7.1 documentation )? That uses send /recv and all_reduce etc… there are so many options that this confusing and frustrating.

Hope this can help: PyTorch Distributed Overview — PyTorch Tutorials 2.1.1+cu121 documentation

The first version of RPC is actually built on top of send/recv. To make distributed model parallel easier, it also provides features like distributed autograd (so that you don’t need to manually handle backward gradients using send/recv), Remote Reference (so that you can share a remote object without copying real data), distributed optimizer (so that you don’t need to manually call opt.step() on every participating process), etc.

1 Like

I was leaving it empty but yes None is enough. I can’t remember if I red that comment from the pytorch code directly or if it was in the docs when I was doing it:

the issue I am facing right now is however, that even though I have 112 cores (or cpus not sure the difference) available my serial code is actually faster…?! not sure if you had that issue or anyone knows a solution.

Wow that is really useful…wish I saw this went it was most relevant to me. If I get back to this issue I will check if I have cores, cpus etc those details exact and report back here to you. Thanks for your help. Have you been able to speed up your code at all with multiple CPUs? How did you do it?

I recall did some similar experiments before, and noticed that even if I run serial PyTorch code, it could still make all my 24 CPU cores busy. I guess through OMP/parallelized-for-loop? @albanD would know more :slight_smile:

But if you have two machines and if communication is not the main bottleneck, I would assume CPU DDP can still help.

Low level libraries that we use (OpenBLAS, MKL, etc) all do multithreading under the hood. So yes, for CPU compute, there is no need for multi processing.