How to parallelize a loop over the samples of a batch

Hi! I’ve been looking into parallelize operations for different pytorch operations. On a model level - to e.g. train on several GPUs - this appears to be fairly straightforward, and there are plenty of good tutorials out there.

However, I have been trying to parallelize an operation where I split a batch-tensor, and operate on each of the individual samples, like so (this is just a mws - there is an actual reason for me to split the batch):

import torch
import torch.nn as nn
torch.multiprocessing.set_start_method("spawn")
import torch.multiprocessing as mp


class Model(nn.Module):
    def __init__(self):
        nn.Module.__init__(self)
        self.lin1 = nn.Linear(100, 100)
        self.lin2 = nn.Linear(100, 30)
        self.lin3 = nn.Linear(30, 3)

    def forward_single(self, x):
        # just a dummy method
        return self.lin2(x)

    def forward(self, xs):
        step1 = self.lin1(xs)
        step2 = []
        # ---- I would really like to parallelize the following loop
        for x in torch.split(step1, 1):
            step2.append(self.forward_single(x))
        # ----
        step2 = torch.cat(step2, dim=0)
        ys = self.lin3(step2)
        return ys


if __name__ == '__main__':
    input = torch.ones(64, 100).cuda()
    target = torch.ones(64, 3).cuda()

    model = Model()
    model.cuda()
    output = model(input)
    loss_func = nn.MSELoss()
    loss = loss_func(output, target)
    loss.backward()
    print(output[0])

However, any approach I have tried results in the same error message: RuntimeError: Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries. If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).

So - (1) can I actually do this? the error message sounds as if I may not be able to parallelize that loop at all (2) if I can, how? I’d take any dirty workaround, as my code is really pretty slow right now …

Thank you!

4 Likes

Hi,

I would advice first to try and write your code without the for loop using pytorch ops. Even if you need extra computations/complex indexing, this is most likely going to be much faster.

If you forward of a single problem is very small, then the gpu is going to be very ineficient to run it, even if you parallelise the work of launching the jobs on the gpu.

You are not allowed to send tensors that require gradients to other process as it is very hard to track them and backpropagate properly through such move. You would need to detach your tensors and manually send data back and forth.

1 Like

Hi,
thank you for your response! Unfortunately, I cannot write the code without the loop (believe me, I’ve tried - I am well aware of the performance hit I take there). From the look of things, I cannot parallelize that loop while maintaining gradients, correct?
Thanks!

May seem a very stupid question but why dont you use dataparallel?
As far as I can see, you want to apply the same operation (self.lin2) to all the samples in te batch

You can set dataprallel inside the nn.module so, what did i miss?

Have you looked at what “im2col” does? It’s non trivial.

You can do it, but it’s going to be tricky and not really useful as you are in the worst possible case: many small ops that run on the GPU. What will cost the most for you is actually going to run the python code and send jobs to the GPU. To get a small gain, you would need to use multiple processes which are not supported by autograd out of the box and so you will need to unpack/pack Tensors when sending them to make sure it will work.
So yes you can parallelize the loop while maintaining gradients, but it’s going to be hard and I don’t think you’re going to see any big improvement by doing so.

1 Like

Hi!
First - thank you for your responses. @JuanFMontesinos - that is just a mws - in reality, the code has different computation steps for different samples (so some form of conditional computation model). Now that I know that parallelizing that loop is not possible (@albanD - thank you here, I understand your argument) , I’ve instead settled for the following solution (in case anybody else ever stumbles across a similar problem):

class Model(nn.Module):
    def __init__(self):
        nn.Module.__init__(self)
        self.lin1 = nn.Linear(100, 100)
        self.lin2 = nn.Linear(100, 30)
        self.lin3 = nn.Linear(30, 3)

    def forward(self, xs):
        step1 = self.lin1(xs)
        step2 = []
        results = torch.zeros(*results_shape).to(xs.device)
        # computing different masks for different computational steps
        mask1 = some_masking_function1(xs)
        results[mask] = some_computation1(xs[mask])
        mask2 = some_masking_function2(xs)
        results[mask] = some_computation2(xs[mask])
        ...
        maskn = some_masking_functionn(xs)
        results[mask] = some_computationn(xs[mask])
        results = self.lin3(results)
        return results

This solution effectively “clusters” samples by which computation I want to perform, and then batches over all these samples. As I have potentially many computation steps, this approach did only help me by a factor of 1.5-1.8 - but I’ll take it!

Thank you again.

1 Like

I want to do it but my situation is that I only have cpus (I have up to 112). I tried it but I also get the same bug. How do I get around it? @albanD I made a totally self contained example:


import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader

from torch.multiprocessing import Pool

class SimpleDataSet(Dataset):

    def __init__(self, Din, num_examples=23):
        self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
        # target function is x*x
        self.y_dataset = [x**2 for x in self.x_dataset]

    def __len__(self):
        return len(self.x_dataset)

    def __getitem__(self, idx):
        return self.x_dataset[idx], self.y_dataset[idx]

def get_loss(args):
    x, y, model = args
    y_pred = model(x)
    criterion = nn.MSELoss()
    loss = criterion(y_pred, y)
    return loss

def get_dataloader(D, num_workers, batch_size):
    ds = SimpleDataSet(D)
    dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
    return dl

def train_fake_data():
    num_workers = 2
    Din, Dout = 3, 1
    model = nn.Linear(Din, Dout).share_memory()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

    batch_size = 2
    num_epochs = 10
    # num_batches = 5
    num_procs = 5
    dataloader = get_dataloader(Din, num_workers, batch_size)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(num_epochs):
        for _, batch in enumerate(dataloader):
            batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
            with Pool(num_procs) as pool:
                optimizer.zero_grad()

                losses = pool.map(get_loss, batch)
                loss = torch.mean(losses)
                loss.backward()

                optimizer.step()
            # scheduler
            scheduler.step()


if __name__ == '__main__':
    # start = time.time()
    # train()
    train_fake_data()
    # print(f'execution time: {time.time() - start}')

Error:

Traceback (most recent call last):
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
    runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
    train_fake_data()
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
    losses = pool.map(get_loss, batch)
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'

I am sure I want to do this. How should I be doing this?


related links from research:

Hi,

This is exactly what I mention above as “To get a small gain, you would need to use multiple processes which are not supported by autograd out of the box and so you will need to unpack/pack Tensors when sending them to make sure it will work.” in my answer above: you won’t be able to do multiprocessing with Tensors that are involved in autograd out of the box.
You will need to unpack/repack then and do the missing part of the autograd yourself.

Note that we now have a distributed-aware version of the autograd here: Distributed RPC Framework — PyTorch 1.7.1 documentation but it is not as full-featured as the vanilla autograd.

1 Like

Just to clarify, are you saying what i am trying to do cannot be done with the parallel.DistributedDataParallel DistributedDataParallel — PyTorch master documentation?

No,

DDP would be another alternative. But it won’t give you the flexibility of passing an arbitrary function that computes what you want on different processes. You will need to give it a nn.Module with your model. Which might not a be a restriction in your case.

1 Like

Ah, ok thanks! I appreciate the pointer to the other docs I will also check them!

My current goal is to give a chunk of batches or single examples in a list to a model (that is shared in parallel). Then do the optimizer step once all the losses have been computed (like in my example). Will have to read DDP to see how thats done there (note one of my model takes in a list of examples while the other does take a tensor). Will try to parallelize both.

——

I basically want to translate my self contained exampled to the right pytorch parallelization.

@albanD I was readint the api/docs for DistributedDataParallel — PyTorch 2.1 documentation and it says this (which worries me for my application):

This container parallelizes the application of the given module by splitting the input across the specified devices by chunking in the batch dimension

the chunking is problematic. My models do not receive thesors. They received special parsed objects that I need to process. Batching this is not easy this is why I am resorting to distributed. Let assume that I am right that batching is hard. If that is the case then how would DDP “chunk” my list of python objects? Would it work at all?

e.g.

def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    x = torch.randn(20, 10)  # imagine we change this to a list [ obj1, ..., obj_batch_size]
    outputs = ddp_model(x)
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()

what would happen if x is a list of objects instead?

If you give a list, AFAIK, it will slice it and so if it is a list of 10 elements and you split that over 5 processes, each process will get 2 elements from the list.
So x[0:2] would go to the first process, x[2:4] would go to the second, etc

1 Like

cool! will test it out, thanks AlbanD!

but the above example is confusing. How does the demo_basic code actually receive a batch? Is there an example with the actual data loader? @albanD


update

perhaps what I need is an example with a dataloader…seems this page has one: Distributed data parallel training in Pytorch

Well usually, the whole block between optimizer.zero_grad() and optimizer.step() is inside a for-loop that takes samples x from a dataset to perform the training.
So this example is missing the training loop there that would be getting the batch.

@albanD is the right way to think about it then is that the ddp_model(x) is what “blocks” so that the next lines are executed with the right coordinated gradients? (the strange thing is that the demo basic code I pasted is usually the fn that a process would execute on it’s own which is weird because then how do you make sure that each process samples the same x if they have their own dataloader?..I am assuming that the code demo_basic(randk, world_size) is running is not the actual code and it get’s compiled to some other magic that makes it work properly).

Thanks for the help!

Source of example: Getting Started with Distributed Data Parallel — PyTorch Tutorials 1.7.1 documentation

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

I am really not an expert on how to use DDP but from what I understand, each process has its own dataset/dataloder and they use a distributed sampler to make sure they don’t sample the same thing: torch.utils.data — PyTorch 1.7.1 documentation

2 Likes

I know you said you are not an expert with DDP (if you know who is, can you forward my question bellow?) but when using cpus only how do I pass the ranks to the DDP? if I try to use a modify example of the basic demo, I’ve seen it complains that there are no gpu - so the examples are meant for GPU not cpu (ddp interprets rank as a gpu name or something).

In summary when I create the DDP model which of the following is correct (for multi cpu):

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

or

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()

for it to work properly in multiple CPUs?

note usually one does: # ddp_model = DDP(model, device_ids=[rank]) for GPU to move the model to gpu with that rank (at least thats what my comment says).

Whole self contained example (without dataloaders…that was my next step):

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')

How is the RPC framework different from what this tutorial shows (Writing Distributed Applications with PyTorch — PyTorch Tutorials 1.7.1 documentation)? That uses send/recv and all_reduce etc… there are so many options that this confusing and frustrating. I don’t understand why (I understand the error by why can’t it just do rpc.map or something for me and allow me to use gradients)

with Poo(100) in pool:
    losses = pool.map(forward, batch)
    torch.mean(losses).backward()
optimizer.step()

works…


rpc example: python - How does one implement parallel SGD with the pytorch autograd RPC library so that gradients can be received from different processes without errors? - Stack Overflow


My intetion was to parallelize meta-learning with torchmeta + higher but it seems that path is dead using DDP until higher is incorporated into the core of pytorch. See:

but the RPC path might not be dead: