Torch distributed cannot find init_process_group

I am trying to implement a basic neural network distributed over (4) threads on a single CPU using torch.distributed package and gloo backend.

Here is my current code: (model is defined in another file)


import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torch.distributed as dist
import torchvision.transforms as transforms
import numpy as np
import os 
import argparse
import torchvision
from torch.multiprocessing import Process


from models.model import Network



def evaluate(model, dataloader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    test_loss = 0.0

    with torch.no_grad():
        for i, data in enumerate(dataloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct, total, test_loss


def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size


def main(local_rank, world_size):

    parser = argparse.ArgumentParser()
    parser.add_argument("--local-rank", type=int)
    args = parser.parse_args()

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'     
 

    print(torch.distributed.is_available())
    dist.init_process_group(backend='gloo', init_method='env://', rank=int(os.environ['RANK']), world_size=dist.get_world_size())

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print(f"Process {dist.get_rank()}/{dist.get_world_size()} is using device {device}")

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])

    batch_size = 4

    train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_set, num_replicas=dist.get_world_size(), rank=dist.get_rank())
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, sampler=train_sampler)

    test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=2)

    model = Network()
    model.to(device=device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    for epoch in range(6):
        #train(model, train_loader, criterion, optimizer, device)

        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data[0].to(device), data[1].to(device)
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()

            # Synchronize before gradient sharing

            average_gradients(model)
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0
       

       
    
        #evaluation done by master process
        if dist.get_rank() == 0:
            # Print validation accuracy and loss
            correct, total, test_loss = evaluate(model, test_loader, criterion, device)
            print(correct, total, test_loss)
            print('scores')
            print(f'Epoch {epoch + 1}:')
            print('Validation Accuracy: %.2f %%' % (100 * correct / total))
            print('Validation Loss: %.3f' % (test_loss / len(test_loader)))



if __name__ == '__main__':
    world_size = 4  # Set the number of processes
    processes = []
    for rank in range(world_size):
        p = Process(target=main, args=(rank, world_size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

I then tried running the code using:

python distributed_train.py

Which gave me the following error for which I cannot find a solution:

True
Process Process-1:
Traceback (most recent call last):
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/rohitkumar/github_code/distributed-neural-network/distributed_train.py", line 58, in main
    dist.init_process_group(backend='gloo', init_method='env://', rank=int(os.environ['RANK']), world_size=dist.get_world_size())
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1492, in get_world_size
    return _get_group_size(group)
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 785, in _get_group_size
    default_pg = _get_default_group()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 940, in _get_default_group
    raise RuntimeError(
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.
True
Process Process-2:
Traceback (most recent call last):
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/rohitkumar/github_code/distributed-neural-network/distributed_train.py", line 58, in main
    dist.init_process_group(backend='gloo', init_method='env://', rank=int(os.environ['RANK']), world_size=dist.get_world_size())
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1492, in get_world_size
    return _get_group_size(group)
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 785, in _get_group_size
    default_pg = _get_default_group()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 940, in _get_default_group
    raise RuntimeError(
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.
True
Process Process-3:
True
Process Process-4:
Traceback (most recent call last):
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/rohitkumar/github_code/distributed-neural-network/distributed_train.py", line 58, in main
    dist.init_process_group(backend='gloo', init_method='env://', rank=int(os.environ['RANK']), world_size=dist.get_world_size())
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1492, in get_world_size
    return _get_group_size(group)
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 785, in _get_group_size
    default_pg = _get_default_group()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 940, in _get_default_group
    raise RuntimeError(
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.
Traceback (most recent call last):
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/rohitkumar/github_code/distributed-neural-network/distributed_train.py", line 58, in main
    dist.init_process_group(backend='gloo', init_method='env://', rank=int(os.environ['RANK']), world_size=dist.get_world_size())
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1492, in get_world_size
    return _get_group_size(group)
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 785, in _get_group_size
    default_pg = _get_default_group()
  File "/home/rohitkumar/.conda/envs/november/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 940, in _get_default_group
    raise RuntimeError(
RuntimeError: Default process group has not been initialized, please make sure to call init_process_group.

A similar error is displayed when I try instead:

python -m torch.distributed.launch --nproc_per_node=4 --nnodes=1 distributed_train.py

The main cause of error seems to be about the init_process_group(), however I cannot understand what I’m doing wrong.

Any help would be highly appreciated, thanks!