DataParallel or DistributedDataParallel in a cluster without gpus

I would like to know if is it possible to use DataParallel or DistributedDataParallel in a cluster without gpus. I want to parallelize the training over a huge number of cpus, is it possible to do this with the current implementations?

Pure cpu cluster is also supported by distributed package. See the doc for torch.distributed for details. You probably can’t use the built-in DDP, but you can use the primitives in the distributed package to write your own training code.

Thanks Simon for you answer. I saw that it is possible to write the training code using the distributed package, also in the tutorial there is an example to do your own DistributedDataParallel for cpu. But I am doing the question, because in this tutorial is recommending to use the DistributedDataParallel provided in the framework, but far as I saw in the documentation, the current implementation only supports cluster with gpus. Is it true? Or I missing something?

Hi! I just went to look it up and apparently we have a DistributedDataParallelCPU class :slight_smile: https://github.com/pytorch/pytorch/blob/master/torch/nn/parallel/distributed_cpu.py

Hi! Thanks Simon! It seems like the documentation of this class is not included in the official documentation. For this reason I was not able to find out :frowning: Next time I will look up also on the repository :slight_smile: Now I will try to use it!

:slight_smile: I submitted a PR to enable it in official doc.

Thanks! I was trying to use the DistributedDataParallelCPU() class and I have some doubts that maybe you can help me to clarify. In order to understand how this class method work I created a sample code with a vgg16 and cifar10 dataset. To execute the code, I am using the tool provided in pytorch named “torch.distributed.launch”At this moment I am only using one node with two processes .This is the command line that I am using to execute the code:

python -m torch.distributed.launch –nodes=1 –node_rank=0 –nproc_per_node=2 –master_addr=”127.0.0.1” --master_port=1234 ./distributed_cpu_trainning.py

Below you can find the code(maybe it is a little bit ugly but it is only for testing purposes):

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.datasets as datasets
from torchvision import models

import torch.nn.parallel as parallel
import torch.distributed as dist
import os
import time


from torch.utils.data import distributed as dataDistributed

import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])

dist.init_process_group(backend="gloo", init_method='env://', world_size=2)

train_dataset=datasets.CIFAR10(root='./data', train=True, 
    transform=transforms.Compose([transforms.RandomHorizontalFlip(),transforms.RandomCrop(32, 4),
    transforms.ToTensor(),
    normalize,
    ]), download=True)

train_sampler = dataDistributed.DistributedSampler(train_dataset)

trainloader = torch.utils.data.DataLoader(train_dataset,
    batch_size=128, shuffle=(train_sampler is None),
    num_workers=2, pin_memory=True,sampler=train_sampler)


#testloader = torch.utils.data.DataLoader(
#    datasets.CIFAR10(root='./data', train=False, transform=transforms.Compose([
#    transforms.ToTensor(),
#    normalize,
#    ])),
#    batch_size=127, shuffle=False,
#    num_workers=2, pin_memory=True)


classes = ('plane', 'car', 'bird', 'cat','deer', 'dog', 'frog', 'horse', 'ship', 'truck')
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        #self.conv1 = nn.Conv2d(3, 6, 5)
        #self.pool = nn.MaxPool2d(2, 2)
        #self.pool2 = nn.MaxPool2d(2, 2)
        #self.conv2 = nn.Conv2d(6, 16, 5)
       
        
        #self.features = models.vgg16_bn().features
        
        self.features01 = nn.Sequential(
                nn.Conv2d(3,64,kernel_size=3,padding=1),
                nn.BatchNorm2d(64),
                nn.ReLU(True),
                nn.Conv2d(64,64,kernel_size=3,padding=1),
                nn.BatchNorm2d(64),
                nn.ReLU(True),
                nn.MaxPool2d(kernel_size=2,stride=2),
                nn.Conv2d(64,128,kernel_size=3,padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(True),
                nn.Conv2d(128,128,kernel_size=3,padding=1),
                nn.BatchNorm2d(128),
                nn.ReLU(True),
                nn.MaxPool2d(kernel_size=2,stride=2),
                )
        self.features02 = nn.Sequential(
                nn.Conv2d(128,256,kernel_size=3,padding=1),
                nn.BatchNorm2d(256),
                nn.ReLU(True),
                nn.Conv2d(256,256,kernel_size=3,padding=1),
                nn.BatchNorm2d(256),
                nn.ReLU(True),
                nn.Conv2d(256,256,kernel_size=3,padding=1),
                nn.BatchNorm2d(256),
                nn.ReLU(True),
                nn.MaxPool2d(kernel_size=2,stride=2),
                nn.Conv2d(256,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.Conv2d(512,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.Conv2d(512,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.MaxPool2d(kernel_size=2,stride=2),
                )
        self.features03 = nn.Sequential(
                nn.Conv2d(512,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.Conv2d(512,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.Conv2d(512,512,kernel_size=3,padding=1),
                nn.BatchNorm2d(512),
                nn.ReLU(True),
                nn.MaxPool2d(kernel_size=2,stride=2),
                )

        
        self.classifier = nn.Sequential(
                            nn.Dropout(),
                            nn.Linear(512, 512),
                            nn.ReLU(True),
                            nn.Dropout(),
                            nn.Linear(512, 512),
                            nn.ReLU(True),
                            nn.Linear(512, 10),
                            )

    def forward(self, x):
        
        x = self.features01(x)
        x = self.features02(x)
        x = self.features03(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)

        return x

model = Net()
net = parallel.DistributedDataParallelCPU(model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)


start = time.time()
for epoch in range(2): # loop over the dataset multiple times
    train_sampler.set_epoch(epoch)
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs
        inputs, labels = data
        
        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        print(outputs.size())
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()

    running_loss = 0.0
stop = time.time()
print('Finished Training')
print(stop-start)

I am not sure if the code is working properly, because, when I print the size of the output both processes are telling me that the size is [128,10]. As you can see in the code the batch_size is 128, then I am expecting to have as output size half of batch size on each process. Am I doing something wrong? In order to split the dataset I am using torch.utils.data.distributed.DistributedSampler.

I am going to answer my own question. DistributedSampler is not dividing the batch size, only devides the dataset. I would like to ask another question, when the gradients are mixed, I hope that all the processes sends its gradients to the master process, then master process executes the backpropagation and then sends back the new weights. Is it working like this? Because if all the gradients are sended to all processes I will have a peak of memory consumption. Can you help me to clarify this?

I have the same question here…
What is the best method to distribute learning across multiple CPUs (multiprocessing)?

And to Dhorka’s last point, how to retrieve the final, combined, averaged model from the main process?

Thanks for any update and help to this question.