Dataloader fails when using num_workers > 1

I am using torch.distributed to launch and distributed training task. I am also trying to use “num_workers > 1” to optimize the training speed.

However, I get the following “Segmentation Fault” error whenever I using “num_workers > 1” …

Error: DataLoader worker (pid(s) 17423) exited unexpectedly
Traceback (most recent call last):
File “/miniconda/envs/iris/lib/python3.6/site-packages/torch/utils/data/dataloader.py”, line 990, in _try_get_data
data = self._data_queue.get(timeout=timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/queues.py”, line 104, in get
if not self._poll(timeout):
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 257, in poll
return self._poll(timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 414, in _poll
r = wait([self], timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 911, in wait
ready = selector.select(timeout)
File “/miniconda/envs/iris/lib/python3.6/selectors.py”, line 376, in select
fd_event_list = self._poll.poll(timeout)
File “/miniconda/envs/iris/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py”, line 66, in handler
_error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 17423) is killed by signal: Segmentation fault.

I tried to analyze the root-cause of this error … in my case this error comes in the following scenario:
Whenever I send the metrics (using tensorboard or equivalent) … I get the above segmentation fault, once the training resumes …

I use a pre-condition to check if its the master-worker and send the metrics only using the master-process … once the training resumes I get the SegFault Error … Is there a way to prevent the DataLoader to use master worker for dataloading … or is there a different way in which we can address this issue … Please let me know …

The Error is also reported sometimes in a different was as below:
File “dist_pytorch/train_simple.py”, line 112, in train
for data in iter(dataloader):
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 517, in next
data = self._next_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 1182, in _next_data
idx, data = self._get_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 1148, in _get_data
success, data = self._try_get_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 986, in _try_get_data
data = self._data_queue.get(timeout=timeout)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/queues.py”, line 113, in get
return _ForkingPickler.loads(res)
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/multiprocessing/reductions.py”, line 282, in rebuild_storage_fd
fd = df.detach()
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/resource_sharer.py”, line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/resource_sharer.py”, line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 498, in Client
answer_challenge(c, authkey)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 742, in answer_challenge
message = connection.recv_bytes(256) # reject large message
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 407, in _recv_bytes
buf = self._recv(4)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 379, in _recv
chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer

Could you explain what exactly you did which seems to cause the segfault? Based on your description I guess you are checking the worker id somehow inside the Dataset?

Hi,

I have not added anything to lead to a SegFault .. I was only mentioning the program flow which I thought was somehow leading to this error .. my guess might have been wrong ..

I am attaching a sample training program which might explain the above condition ..

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
import numpy as np
import argparse
import os
import turibolt as bolt
L2_Loss = nn.MSELoss()

---------------------------------------------------------------------------------------------------------------------

class SpeechDataset(Dataset):
def init(self):
self.T_MAX = 200
def len(self):
return 40000
def getitem(self, idx):
mels = np.random.randn(self.T_MAX, 512)
return torch.from_numpy(mels).type(torch.FloatTensor).transpose(0, 1)

---------------------------------------------------------------------------------------------------------------------

class MelsCollate():
“”" Zero-pads model inputs and targets based on number of frames per step
“”"
def init(self):
pass

def __call__(self, batch):
    """Collate's training batch from normalized text and mel-spectrogram
    PARAMS
    ------
    batch: [text_normalized, mel_normalized]
    """
    # Right zero-pad all mel sequences to max mel length
    melseq_lengths, ids_sorted_decreasing = torch.sort(
        torch.LongTensor([x.size(1) for x in batch]),
        dim=0, descending=True)
    max_mel_len = melseq_lengths[0]
    mels_padded = torch.FloatTensor(len(batch), 512, max_mel_len)

    # initialize with zeros
    mels_padded.zero_()
    # copy
    for i in range(len(ids_sorted_decreasing)):
        mels = batch[ids_sorted_decreasing[i]]
        mels_padded[i, :, :mels.size(1)] = mels
    return mels_padded

----------------------------------------------------------------------------------------------------------------------

class BasicConvBlock(nn.Module):
def init(self, ndim_in, ndim_out, kernel_size, dilation_factor=1):
super(BasicConvBlock, self).init()
self.relu = nn.ReLU()
padding=int((kernel_size - 1) / 2)*dilation_factor
self.conv1 = nn.Conv1d(ndim_in, ndim_out, kernel_size, stride=1, dilation=dilation_factor, padding=padding)

def forward(self, input):
    output = self.relu(self.conv1(input)) + input
    return output

----------------------------------------------------------------------------------------------------------------------

class ConvStack(nn.Module):
def init(self, ndim_in, ndim_out):
super(ConvStack, self).init()
self.dilation_factors = [1, 2, 4, 8, 16, 1, 2, 4, 8, 16]
self.conv_stack = nn.ModuleList([BasicConvBlock(ndim_in=ndim_in, ndim_out=ndim_out, kernel_size=21, dilation_factor=dilation_factor)
for dilation_factor in self.dilation_factors])

def forward(self, input):
    output = input
    for conv_block in self.conv_stack:
        output = conv_block(output)
    return output

----------------------------------------------------------------------------------------------------------------------

def train_step(data, model, optimizer, scheduler, device):
data = data.to(device)
optimizer.zero_grad()
pred_mels = model(data)
total_loss = L2_Loss(pred_mels, data)
total_loss.backward()
optimizer.step()
scheduler.step()
return total_loss

----------------------------------------------------------------------------------------------------------------------

def train(local_rank, send_bolt_metrics):
torch.cuda.set_device(local_rank)
num_epochs = 5

#setup the distributed backend for managing the distributed training
torch.distributed.init_process_group('nccl')

# dataset
dataset = SpeechDataset()
dist_sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, batch_size=16, num_workers=2, sampler=dist_sampler, collate_fn=MelsCollate())

# model
model = ConvStack(512, 512)

#set the cuda device to a GPU allocated to current process
device = torch.device('cuda', local_rank)
model = model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model,  device_ids=[local_rank],
                                                      output_device=local_rank)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, betas=(0.9, 0.98), eps=1e-09)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[i*100000 for i in range(1, 6)], gamma=0.5)

step = 0
for epoch in range(num_epochs):
    for data in iter(dataloader):
        loss = train_step(data, model, optimizer, scheduler, device)
        step += 1
        if step % 10 == 0 and local_rank==0:
            print(epoch, step, loss.detach().cpu().numpy())
            # send metrics
            if send_bolt_metrics:
                bolt.send_metrics({'loss': loss.detach().cpu().item()})
            # save model
            torch.save(model.state_dict, 'checkpoints/model_' + str(step) + '.pt')

#prase the local_rank argument from command line for the current process
parser = argparse.ArgumentParser()
parser.add_argument(“–local_rank”, default=0, type=int)
parser.add_argument(“–send_bolt_metrics”, default=False, action=‘store_true’)
args = parser.parse_args()
os.makedirs(‘checkpoints’, exist_ok=True)
train(args.local_rank, args.send_bolt_metrics)

The error occurs when the send_bolt_metrics=True .. otherwise the training seems to proceed fine ..

bolt.send_metrics is a mechanism via which the metrics are sent to a webpage for visualization .. similar to tensorboard ..

Somehow it seems as if “connection” is reset by this bolt.send_metrics which gives rise to this error: “connection reset by peer” ..

I am not sure how to avoid this ..