Strange error when parsing JSON in multiple worker data loader

Hi, we have enabled the multi worker data loader to load 10K+ training data files, the speed is pretty good with multiple workers, however, we also try to leverage the capability of multi worker to not only read data line by line, but parsing the line to JSON Dict, here we have problem

ERROR: Unexpected segmentation fault encountered in worker.
Traceback (most recent call last):
  File "/home/miniconda/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 480, in _try_get_batch
    data = self.data_queue.get(timeout=timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/queues.py", line 104, in get
    if not self._poll(timeout):
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 257, in poll
    return self._poll(timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 414, in _poll
    r = wait([self], timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 911, in wait
    ready = selector.select(timeout)
  File "/home/miniconda/lib/python3.6/selectors.py", line 376, in select
    fd_event_list = self._poll.poll(timeout)
  File "/home/miniconda/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py", line 65, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 95106) is killed by signal: Segmentation fault.

searched around and it seems point to shared memory problem, However, I think our system has sufficient shared memory.

another interesting point is : when I just parse the data line by line, I do not have this issue:

        with open(current_file, mode='rb') as f:
            text = f.read().decode('utf-8')
            all_data.extend(text.split('\n'))

but if I add a JSON parse logic after read line by line , it will report this error

with open(current_file, mode='rb') as f:
            text = f.read().decode('utf-8')
            all_data.extend(text.split('\n'))

        json_data = []
        for line in all_data:
            try:
                json_data.append(json.loads(line))
            except:
                break
return json_data

so I wonder whether this is indeed shared memory issue? it is very rough error message. Also I wonder what is really shared memory used in multi worker data loader case, should we have multiple worker/process run independently and provide data to GPU finally? I am not seeing much need to share large chunk of memory besides some worker coordination?

Are you seeing the same error, if you reduce the number of workers?
If not, then it could point to an insufficient amount of shared memory. Could you check, how much shared memory your system is using at the moment?

Each workers is a process, which needs to communicate with the main process e.g. via shared memory. Multi-threading is not really an option in Python due to the GIL.

Thanks @ptrblck

Yes I am seeing this error even if I have worker reduced to 2, and also I believe I have enough shared memory as below(confirmed our training platform add all memory as shared memory)

BTW, if I have N workers, dose it means I need O(N*size(batch)) shared memory?


------ Messages Limits --------

max queues system wide = 32000

max size of message (bytes) = 8192

default max size of queue (bytes) = 16384

------ Shared Memory Limits --------

max number of segments = 4096

max seg size (kbytes) = 18014398509465599

max total shared memory (kbytes) = 18014398509481980

min seg size (bytes) = 1

------ Semaphore Limits --------

max number of arrays = 32000

max semaphores per array = 32000

max semaphores system wide = 1024000000

max ops per semop call = 500

semaphore max value = 32767

Also I have general question regarding multi processing in pytorch, if I have multi worker dataloader already, and I start my main training proces followling

https://pytorch.org/docs/stable/notes/multiprocessing.html#multiprocessing-best-practices

In this case, the main process will spin off multi process, and data loader will also have multi process, will the dataloader and training share same processes underneath in OS? or both of them create new processes?

import torch.multiprocessing as mp
from model import MyModel

def train(model):
    # Construct data_loader, optimizer, etc.
    for data, labels in data_loader:
        optimizer.zero_grad()
        loss_fn(model(data), labels).backward()
        optimizer.step()  # This will update the shared parameters

if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    # NOTE: this is required for the ``fork`` method to work
    model.share_memory()
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

Update thread again, it seems JSON parsing introduce a lot of memory overhead in python, suspect that is problem, also JSON parsing is very slow. Any better JSON lib recommended? I am not sure whether it is JSON memory overhead that cause multi worker shm error? how do we estimate how much shm is needed for our given case?

The DataLoader processes should be independent from the DistributedDataParallel processes.
You could try to use pandas to read and process your JSON data (I’m always using it, as it’s quite easy to further process the DataFrames).

Let me know, if that helps.

Thanks, will try Pandas, our training record is split into multiple files, each files is multiple lines of string, each line is JSON string. so a little bit more to work on JSON, but definitely try Pandas

BTW, regarding The DataLoader processes should be independent from the DistributedDataParallel processes., so let’s see we have 32 data loader workers, and we create 16 multiprocessing for our training, then is it 48 process running? or little bit complicated and can not simply add together?

It might depend how you are using DistributedDataParallel.
The fastest and recommended way would be to create a process for each GPU, which would also mean that these process use their own DataLoaders.
For 8 GPUs (8 processes in DDP), and e.g. num_workers=2, you should see 8*2=16 processes.

Got it ,Thanks, I tried Pandas it is much faster but also get same segmental fault error after a while, so I wonder is there some way I can use to calculate how much shared memory do I need? with number of workers and training data size, can we roughly estimate how much shared memory needed?

I don’t know, if this is really related to shared memory, as your system seems to have enough.
Could you run the code via:

$ gdb --args python my_script.py
...
Reading symbols from python...done.
(gdb) run
...
(gdb) backtrace
...

and post the backtrace here?

Thanks @ptrblck

Very interesting I did not have any stacktrace


(gdb) run

Starting program: /home/miniconda/bin/python performance.py

[Thread debugging using libthread_db enabled]

Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

[New Thread 0x7fffa60a6700 (LWP 61963)]

[New Thread 0x7fffa58a5700 (LWP 61964)]

[New Thread 0x7fffa10a4700 (LWP 61965)]

[New Thread 0x7fff9e8a3700 (LWP 61966)]

[New Thread 0x7fff9c0a2700 (LWP 61967)]

[New Thread 0x7fff998a1700 (LWP 61968)]

[New Thread 0x7fff970a0700 (LWP 61969)]

[New Thread 0x7fff9489f700 (LWP 61970)]

[New Thread 0x7fff9409e700 (LWP 61971)]

[New Thread 0x7fff8f89d700 (LWP 61972)]

[New Thread 0x7fff8d09c700 (LWP 61973)]

[New Thread 0x7fff8a89b700 (LWP 61974)]

[New Thread 0x7fff8809a700 (LWP 61975)]

[New Thread 0x7fff85899700 (LWP 61976)]

[New Thread 0x7fff83098700 (LWP 61977)]

[New Thread 0x7fff80897700 (LWP 61978)]

[New Thread 0x7fff7e096700 (LWP 61979)]

[New Thread 0x7fff7d895700 (LWP 61980)]

[New Thread 0x7fff7b094700 (LWP 61981)]

[New Thread 0x7fff78893700 (LWP 61982)]

[New Thread 0x7fff74092700 (LWP 61983)]

[New Thread 0x7fff71891700 (LWP 61984)]

[New Thread 0x7fff6f090700 (LWP 61985)]

[Thread 0x7fff7e096700 (LWP 61979) exited]

[Thread 0x7fff6f090700 (LWP 61985) exited]

[Thread 0x7fff74092700 (LWP 61983) exited]

[Thread 0x7fff7b094700 (LWP 61981) exited]

[Thread 0x7fff80897700 (LWP 61978) exited]

[Thread 0x7fff83098700 (LWP 61977) exited]

[Thread 0x7fff85899700 (LWP 61976) exited]

[Thread 0x7fff8809a700 (LWP 61975) exited]

[Thread 0x7fff8a89b700 (LWP 61974) exited]

[Thread 0x7fff8d09c700 (LWP 61973) exited]

[Thread 0x7fff8f89d700 (LWP 61972) exited]

[Thread 0x7fff9409e700 (LWP 61971) exited]

[Thread 0x7fff9489f700 (LWP 61970) exited]

[Thread 0x7fff970a0700 (LWP 61969) exited]

[Thread 0x7fff998a1700 (LWP 61968) exited]

[Thread 0x7fff9c0a2700 (LWP 61967) exited]

[Thread 0x7fff9e8a3700 (LWP 61966) exited]

[Thread 0x7fffa10a4700 (LWP 61965) exited]

[Thread 0x7fffa58a5700 (LWP 61964) exited]

[Thread 0x7fffa60a6700 (LWP 61963) exited]

[Thread 0x7fff71891700 (LWP 61984) exited]

[Thread 0x7fff78893700 (LWP 61982) exited]

[Thread 0x7fff7d895700 (LWP 61980) exited]

total_files = 5040.  //customer comments

[New Thread 0x7fff6f090700 (LWP 62006)]

[New Thread 0x7fff71891700 (LWP 62007)]

start to get data //customer comments

[New Thread 0x7fff74092700 (LWP 62008)]

start to get data

[New Thread 0x7fff78893700 (LWP 62009)]

start to get data

start to get data

current file len = 570 //customer comments

current file len = 566

current file len = 567

current file len = 567

start to get data

start to get data

start to get data

start to get data

current file len = 568

current file len = 569

current file len = 570

current file len = 570

ERROR: Unexpected segmentation fault encountered in worker.

ERROR: Unexpected segmentation fault encountered in worker.

Traceback (most recent call last):

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 761, in _try_get_data

data = self._data_queue.get(timeout=timeout)

File "/home/miniconda/lib/python3.6/multiprocessing/queues.py", line 104, in get

if not self._poll(timeout):

File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 257, in poll

return self._poll(timeout)

File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 414, in _poll

r = wait([self], timeout)

File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 911, in wait

ready = selector.select(timeout)

File "/home/miniconda/lib/python3.6/selectors.py", line 376, in select

fd_event_list = self._poll.poll(timeout)

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py", line 66, in handler

_error_if_any_worker_fails()

RuntimeError: DataLoader worker (pid 62005) is killed by signal: Segmentation fault.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File "performance.py", line 62, in <module>

main()

File "performance.py", line 48, in main

for i,batch in enumerate(rl_data_loader):

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 345, in __next__

data = self._next_data()

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 841, in _next_data

idx, data = self._get_data()

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 808, in _get_data

success, data = self._try_get_data()

File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 774, in _try_get_data

raise RuntimeError('DataLoader worker (pid(s) {}) exited unexpectedly'.format(pids_str))

RuntimeError: DataLoader worker (pid(s) 62005) exited unexpectedly

[Thread 0x7fff78893700 (LWP 62009) exited]

[Thread 0x7fff74092700 (LWP 62008) exited]

[Thread 0x7fff71891700 (LWP 62007) exited]

[Thread 0x7fff6f090700 (LWP 62006) exited]

[Inferior 1 (process 61952) exited with code 01]

(gdb) backtrace

No stack.

Actually I did not use DistributedDataParallel, we are still using one machine with single GPU but 24 cores of CPU, right now we don’t think GPU is bottleneck, since CPU may not able to feed data quick enough

Update thread, following this thread:

It seems if I put too complicated logic like parsing JSON or iterate over list in __get_item__, it will fail, if I just put a simple string split logic, it will pass. same as some one mentioned in github thread

so here comes my question, if I can not put much complicated data process in data_loader, should I have another multi processing logic in our main train.py and put some data cook job there to leverage the multi process in pytorch? what is best proactice

neverthless, I think that github bug should not be closed, issue is still there

Did you try all suggestions in the linked issue, such as increasing the number of segments?

Will try, Thanks, a little bit hard to increase segment, however, I tested with smaller file size and number of files, still same problem, 4096 segment looks definitely enough for me, and also my problem happens almost right after dataloader started, so I don’t believe the data is filling to shm enough yet.

I am trying to use multi processing not only on data loader, but general training program also, so training will have N processing, each processing will have M dataloader processing, total NxM threads/process, but I am not sure whether all independent processing’s dataloader can coordinate to read correct data index

Update this thread a bit, I refactor the training code using pytorch multi processing to speed up some data processing in CPU side(in order to feed to GPU faster ), https://pytorch.org/docs/stable/notes/multiprocessing.html#multiprocessing-best-practices

In each processing func, I also use multi-worker data loader to speed up the data loading processing time.https://pytorch.org/docs/stable/data.html

I put my heaving CPU JSON parsing not in dataloader, but in main training process, and the problem seems gone, I don’t know why but anyway it seems working. but have a follow up question: suppose I have N processing, each has M dataloader worker, so total NxM underneath threading there.

If in my dataloader, I want to get all data in a index way, which means __get_item__(self, idx) in M data loader in N different processing can work together to process different index, how can I ensure they do not process duplicate or miss process some?

I’m not sure I understand the question completely.
The DataLoader will use multiple processes and make sure that each worker gets unique indices.
Are you using another multiprocessing wrapper on top of multiple DataLoaders?
If so, what’s the reason for it? Couldn’t you increase the number of workers in a DataLoader instead?

Thanks @ptrblck

Yes, somehow I am using a multi processing framework like this

if __name__ == "__main__":
    num_processes = 8

    can_do_distributed = torch.distributed.is_available() and torch.cuda.is_available()
    multiprocessing.spawn(fn=single_process_main, nprocs=num_processes, args=[params])

and in single_process_main,

def single_process_main(*args):
     
     data_loader = DataLoader(dataset, num_workers=4, batch_size=1)
     for loo in range(0, epochs):
          # some JSON parsing logic
          data_cook()
          train()

so we leverage multi processing, then each processing with multi worker dataloader, I assume we will have 8x4=32 underneath worker, but I knew dataloader 4 worker will fetch different data, but not sure whether 8 different processing 's dataloader will fetch different data, it seems not.

The reason why I am using this is because if I put JSON parsing in __get_item__ in dataloader, it will have the error as indicated above, thie multi processing and multi worker seems resolve the problem

If you need this workflow to avoid the JSON issue, you could use a SubsetRandomSampler for each DataLoader, so that each DataLoader only loads a subset of the data.
However, I’m not sure how train is used.
Are you training 8 models simultaneously?

No, actually I only train one model, train function is simple NN , and want to have multi processing to process different training data segment(same as multi worker data loader way), but looks like it is not easy to do so