Multiprocessing and dataloader

Hello there,

I’ve looked around for quite some time and could not find a similar post to this. I am trying to train a CNN for super-resolution on a cluster with CPU’s. Unfortunately, I do not have access to GPU’s right now.
So I am using torch.multiprocessing and trying to run a for loop, as explained here : https://github.com/pytorch/examples/tree/master/mnist_hogwild

What seems to happen is that the dataloader does not partition the training data for each worker, and instead each worker computes the forward pass on the whole training set. The loss, when printed within the training loop, appears to be exactly replicated by the number of workers…

Here is the code, that runs locally on my computer (on 4 cores) but crashes on the cluster when trying to use 20 cores from memory overload…

import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from cnn2 import MyModel
from torch.utils import data

class Dataset(data.Dataset):

def __init__(self, list_IDs):
    self.list_IDs = list_IDs

def __len__(self):
    return len(self.list_IDs)

def __getitem__(self, index):
    ID = self.list_IDs[index]
    sample = torch.load('data/' + ID)
    T100MHz = torch.DoubleTensor(sample[0]).detach()
    T250MHz = torch.DoubleTensor(sample[1]).detach()
    return T100MHz, T250MHz

def train(model):
for data_library[‘100MHz’], data_library[‘250MHz’] in training_generator:
optimizer.zero_grad()
loss = loss_function(model(data_library[‘100MHz’]), data_library[‘250MHz’])
loss.backward
print("\nEpoch “+str(epoch)+”: Training Loss = " +str(loss.item()))
optimizer.step()
return loss

def val(model):
for data_library[‘100MHz’], data_library[‘250MHz’] in validation_generator:
optimizer.zero_grad()
loss_function(model(data_library[‘100MHz’]), data_library[‘250MHz’])
scheduler.step()
return loss

def _set_seed(worker_id):
torch.manual_seed(worker_id)

if name == ‘main’:
device = torch.device(“cpu”)
num_processes = 4
forward_model = MyModel().to(device)
#model.load_state_dict(torch.load(os.path.join(os.getcwd() + ‘/models/’ + ‘conv_net_model_17_jun_2020.pt’)))
model_name = ‘1D_conv_net_model.pt’
print(forward_model)
forward_model = forward_model.double()

max_epochs = 400
tune_every = 20 # test and validate the model every x epochs
data_library = {}


# Load all the data from the txt file
file_IDs = open('ID_list.txt','r').read().split('\n')
file_IDs = file_IDs[:-1] # remove last line
complete_dataset = Dataset(file_IDs)

saved_models_path = os.getcwd() + '/models/'
if(not os.path.isdir(saved_models_path)):
    os.makedirs(saved_models_path)


# create your optimizer
optimizer = optim.SGD(forward_model.parameters(), lr=0.05, momentum = 0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.5)

# Set training parameters
params = {'batch_size': 32,
          'shuffle': True,
          'num_workers': num_processes,
          'worker_init_fn' : _set_seed,}

# partition the dataset
lengths = [int(np.ceil(len(complete_dataset)*0.8)), int(np.floor(len(complete_dataset)*0.1)), int(np.floor(len(complete_dataset)*0.1))]
training_set, validation_set, evaluation_set = torch.utils.data.random_split(complete_dataset, lengths)
training_generator = data.DataLoader(training_set, **params)
validation_generator = data.DataLoader(validation_set, **params)
evaluation_generator = data.DataLoader(evaluation_set, **params)

loss_function = nn.MSELoss()
validation_loss = []
validation_loss.append(np.inf)
training_loss = [] 






for epoch in range(max_epochs):
    print('\nEpoch : ' + str(epoch))
    if epoch % tune_every == 0 and epoch > 0:
        # validation
        forward_model.eval()
        if(num_processes>1):
            forward_model.share_memory()
            processes = []
            for rank in range(num_processes):
                loss = mp.Process(target=val, args=(forward_model,))
                loss.start()
                processes.append(loss)
            for loss in processes:
                loss.join()
        else:
            loss = val(forward_model)
        validation_loss.append(loss.item())
        print("\nEpoch "+str(epoch)+": Validation Loss = " +str(loss.item()))
        if(validation_loss[-1] < validation_loss[-2]):
            torch.save(forward_model.state_dict(), os.path.join(saved_models_path + model_name))
            np.save('logs.npy',[training_loss,validation_loss])
            print('Validation loss improved : saving model')
    else:
        forward_model.train()
        if(num_processes>1):
            forward_model.share_memory()
            processes = []
            for rank in range(num_processes):
                loss = mp.Process(target=train, args=(forward_model,))
                loss.start()
                processes.append(loss)
            for loss in processes:
                loss.join()
        else:
            loss = train(forward_model)
        training_loss.append(loss.item())
        print("\nEpoch "+str(epoch)+": Training Loss = " +str(loss.item()))

You could try to use a DistributedSampler for the Hogwild example as described in this post. :wink:

1 Like

It literally took you seconds to solve a problem that has been bugging me for days. Works wonderfully. Immense thanks, sir!

1 Like