Multiprocessing batches on CPU with custom layer

Hi all,

I created Neural Network with a custom layer in Pytorch which needs to run on CPU and it is made in a way that it can only process one batch at the time. To speed up the training, I would like to use multiprocessing to train such model on N batches in parallel (N being the number of cores of my CPU). Once these batches are processed, I would like to backpropagate the loss and keep training the model with the next N batches. Is there any way to do this using torch.multiprocessing? Or should I just use nn.DataParallel for this?

Thanks in advance for your help!

Personally I wouldn’t use multiprocessing to parallelise across physical cores and would instead just go full distributed - which is the easier approach.

Here is an example of how you can do this, essentially all you need to do is write your script and model using DistributedDataParallel, ensure that the backend can find the appropriate WORLD_SIZE variable, and then execute the job through MPI.

Here is an example I just wrote and tested:

import os 
import torch
from torch import nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

if __name__=="__main__":

    world_size = int(os.environ['OMPI_COMM_WORLD_SIZE']) # to be used internally
    os.environ['WORLD_SIZE'] = str(world_size) # for the backend to find 
    rank = int(os.environ['OMPI_COMM_WORLD_RANK'])

    dist.init_process_group(backend='gloo',init_method='env://',rank=rank)

    if rank == 0:
        print("Process group initialised...")

    model = nn.Linear(10,10)

    ddp_model = DDP(model,device_ids=None)

    print(f"Rank {rank} successfully init'd DDP model...")

DDP handles the parellelisation of the model and you can train it as you would locally, i.e., local calls to backward and opt.step and the communication is abstracted away. If you’re doing CPU-only make sure you use an appropriate backend in the call to init_process_group and then when constructing DDP make sure the device_ids is set to None.

Then to run a script like this on 2 CPUs, I use the following:

#!/bin/bash

source activate {conda_environment}

export MASTER_ADDR='localhost'

export MASTER_PORT=12340

mpirun -np 2 python main.py

If you’re doing this on a local machine I would recommend not setting -np to the total number of physical cores on the machine.

Hi Jamie, thanks for your reply. I am trying to understand what your script does. In the meantime, I have seen this alternative solution. What do you think?

import multiprocessing as mp

device = torch.device("cpu")
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

num_epochs = 10
num_workers = 4  # number of CPU cores to use

def train_worker(batch_idx, batch, model, criterion):
    images, labels = batch
    images = images.to(device)
    labels = labels.to(device)

    # Forward pass
    outputs = model(images)
    loss = criterion(outputs, labels)

    # Backward and optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print('Batch [{}/{}], Loss: {:.4f}'.format(batch_idx, len(train_loader), loss.item()))

with mp.Pool(processes=num_workers) as pool:
    for epoch in range(num_epochs):
        for i, batch in enumerate(train_loader):
            pool.apply_async(train_worker, args=(i, batch, model, criterion))

        pool.close()
        pool.join()

I don’t think your solution will work, not because you’re using multiprocessing but because the same model is being passed to each worker - no communication occurs between workers and so you won’t get a synchronised model. Personally I find working with multiprocessing more trouble than it’s worth.

This is why you should use PyTorch’s DataParallel or DistributedDataParellel functionality as it will synchronise and communicate gradients between the workers. You wrap the model object in one of these and the hard parts are handled for you.

The code I provided before assigns each instance of python a rank from the environment variables (initialised when executed using mpirun) and sets the WORLD_SIZE environment variable (total number of instances (models trained simultaneously) we want). Then, the dist.init_process_group essentially allows for communication between the instances identified by their rank. (gloo is the default backend for CPU communication).

Then the model is wrapped using DDP which essentially means each python instance trains a replica of the model on different segments of the training data, but by communicating gradients across processes, each model is synchronised i.e., parameters updated with the same gradients. However, you also need to set up a DistributedSampler which will ensure that each process trains on the correct partition of the data so there isn’t overlap and redundant computation. The upside is that when you train the model you write more or less the same code as you would when training the model serially because the communication is handled for you.

Then by executing the script using MPI, you’re running -np instances of python.