Dataloader fails when using num_workers > 1

I am using torch.distributed to launch and distributed training task. I am also trying to use “num_workers > 1” to optimize the training speed.

However, I get the following “Segmentation Fault” error whenever I using “num_workers > 1” …

Error: DataLoader worker (pid(s) 17423) exited unexpectedly
Traceback (most recent call last):
File “/miniconda/envs/iris/lib/python3.6/site-packages/torch/utils/data/dataloader.py”, line 990, in _try_get_data
data = self._data_queue.get(timeout=timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/queues.py”, line 104, in get
if not self._poll(timeout):
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 257, in poll
return self._poll(timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 414, in _poll
r = wait([self], timeout)
File “/miniconda/envs/iris/lib/python3.6/multiprocessing/connection.py”, line 911, in wait
ready = selector.select(timeout)
File “/miniconda/envs/iris/lib/python3.6/selectors.py”, line 376, in select
fd_event_list = self._poll.poll(timeout)
File “/miniconda/envs/iris/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py”, line 66, in handler
_error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 17423) is killed by signal: Segmentation fault.

I tried to analyze the root-cause of this error … in my case this error comes in the following scenario:
Whenever I send the metrics (using tensorboard or equivalent) … I get the above segmentation fault, once the training resumes …

I use a pre-condition to check if its the master-worker and send the metrics only using the master-process … once the training resumes I get the SegFault Error … Is there a way to prevent the DataLoader to use master worker for dataloading … or is there a different way in which we can address this issue … Please let me know …

The Error is also reported sometimes in a different was as below:
File “dist_pytorch/train_simple.py”, line 112, in train
for data in iter(dataloader):
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 517, in next
data = self._next_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 1182, in _next_data
idx, data = self._get_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 1148, in _get_data
success, data = self._try_get_data()
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 986, in _try_get_data
data = self._data_queue.get(timeout=timeout)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/queues.py”, line 113, in get
return _ForkingPickler.loads(res)
File “/miniconda/envs/iris/lib/python3.7/site-packages/torch/multiprocessing/reductions.py”, line 282, in rebuild_storage_fd
fd = df.detach()
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/resource_sharer.py”, line 57, in detach
with _resource_sharer.get_connection(self._id) as conn:
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/resource_sharer.py”, line 87, in get_connection
c = Client(address, authkey=process.current_process().authkey)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 498, in Client
answer_challenge(c, authkey)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 742, in answer_challenge
message = connection.recv_bytes(256) # reject large message
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 407, in _recv_bytes
buf = self._recv(4)
File “/miniconda/envs/iris/lib/python3.7/multiprocessing/connection.py”, line 379, in _recv
chunk = read(handle, remaining)
ConnectionResetError: [Errno 104] Connection reset by peer

Could you explain what exactly you did which seems to cause the segfault? Based on your description I guess you are checking the worker id somehow inside the Dataset?

Hi,

I have not added anything to lead to a SegFault … I was only mentioning the program flow which I thought was somehow leading to this error … my guess might have been wrong …

I am attaching a sample training program which might explain the above condition …

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
import numpy as np
import argparse
import os
import turibolt as bolt
L2_Loss = nn.MSELoss()

---------------------------------------------------------------------------------------------------------------------

class SpeechDataset(Dataset):
def init(self):
self.T_MAX = 200
def len(self):
return 40000
def getitem(self, idx):
mels = np.random.randn(self.T_MAX, 512)
return torch.from_numpy(mels).type(torch.FloatTensor).transpose(0, 1)

---------------------------------------------------------------------------------------------------------------------

class MelsCollate():
“”" Zero-pads model inputs and targets based on number of frames per step
“”"
def init(self):
pass

def __call__(self, batch):
    """Collate's training batch from normalized text and mel-spectrogram
    PARAMS
    ------
    batch: [text_normalized, mel_normalized]
    """
    # Right zero-pad all mel sequences to max mel length
    melseq_lengths, ids_sorted_decreasing = torch.sort(
        torch.LongTensor([x.size(1) for x in batch]),
        dim=0, descending=True)
    max_mel_len = melseq_lengths[0]
    mels_padded = torch.FloatTensor(len(batch), 512, max_mel_len)

    # initialize with zeros
    mels_padded.zero_()
    # copy
    for i in range(len(ids_sorted_decreasing)):
        mels = batch[ids_sorted_decreasing[i]]
        mels_padded[i, :, :mels.size(1)] = mels
    return mels_padded

----------------------------------------------------------------------------------------------------------------------

class BasicConvBlock(nn.Module):
def init(self, ndim_in, ndim_out, kernel_size, dilation_factor=1):
super(BasicConvBlock, self).init()
self.relu = nn.ReLU()
padding=int((kernel_size - 1) / 2)*dilation_factor
self.conv1 = nn.Conv1d(ndim_in, ndim_out, kernel_size, stride=1, dilation=dilation_factor, padding=padding)

def forward(self, input):
    output = self.relu(self.conv1(input)) + input
    return output

----------------------------------------------------------------------------------------------------------------------

class ConvStack(nn.Module):
def init(self, ndim_in, ndim_out):
super(ConvStack, self).init()
self.dilation_factors = [1, 2, 4, 8, 16, 1, 2, 4, 8, 16]
self.conv_stack = nn.ModuleList([BasicConvBlock(ndim_in=ndim_in, ndim_out=ndim_out, kernel_size=21, dilation_factor=dilation_factor)
for dilation_factor in self.dilation_factors])

def forward(self, input):
    output = input
    for conv_block in self.conv_stack:
        output = conv_block(output)
    return output

----------------------------------------------------------------------------------------------------------------------

def train_step(data, model, optimizer, scheduler, device):
data = data.to(device)
optimizer.zero_grad()
pred_mels = model(data)
total_loss = L2_Loss(pred_mels, data)
total_loss.backward()
optimizer.step()
scheduler.step()
return total_loss

----------------------------------------------------------------------------------------------------------------------

def train(local_rank, send_bolt_metrics):
torch.cuda.set_device(local_rank)
num_epochs = 5

#setup the distributed backend for managing the distributed training
torch.distributed.init_process_group('nccl')

# dataset
dataset = SpeechDataset()
dist_sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, batch_size=16, num_workers=2, sampler=dist_sampler, collate_fn=MelsCollate())

# model
model = ConvStack(512, 512)

#set the cuda device to a GPU allocated to current process
device = torch.device('cuda', local_rank)
model = model.to(device)
model = torch.nn.parallel.DistributedDataParallel(model,  device_ids=[local_rank],
                                                      output_device=local_rank)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, betas=(0.9, 0.98), eps=1e-09)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[i*100000 for i in range(1, 6)], gamma=0.5)

step = 0
for epoch in range(num_epochs):
    for data in iter(dataloader):
        loss = train_step(data, model, optimizer, scheduler, device)
        step += 1
        if step % 10 == 0 and local_rank==0:
            print(epoch, step, loss.detach().cpu().numpy())
            # send metrics
            if send_bolt_metrics:
                bolt.send_metrics({'loss': loss.detach().cpu().item()})
            # save model
            torch.save(model.state_dict, 'checkpoints/model_' + str(step) + '.pt')

#prase the local_rank argument from command line for the current process
parser = argparse.ArgumentParser()
parser.add_argument("–local_rank", default=0, type=int)
parser.add_argument("–send_bolt_metrics", default=False, action=‘store_true’)
args = parser.parse_args()
os.makedirs(‘checkpoints’, exist_ok=True)
train(args.local_rank, args.send_bolt_metrics)

The error occurs when the send_bolt_metrics=True … otherwise the training seems to proceed fine …

bolt.send_metrics is a mechanism via which the metrics are sent to a webpage for visualization … similar to tensorboard …

Somehow it seems as if “connection” is reset by this bolt.send_metrics which gives rise to this error: “connection reset by peer” …

I am not sure how to avoid this …