Loading Big Parquet files

I have a directory with huge parquet files and have been using fastparquet to read in the files, which works fine.
I want to extend the Dataset class to read them lazily and hope to have a better GPU utilisation.

Here are a few questions regarding the Dataset class:

  1. The len method: Should it return the number of training instances or the number of parquet files in the directory?
  2. The getitem method: Should it return just a single training row (ndarray of shape (1, num_features)) or can it also return a matrix of rows (ndarray of shape (num_rows_in_a_parquetfile, num_features))?

If it can return a matrix, how to feed it into Dataloader class?

Almost all that I have understood reading the forums is that people use it to load images (one instance per file). Not sure how does this work out for reading huge files with N number of rows.

Thanks

the answer to both questions is: up to you.

read through the data loading tutorial: http://pytorch.org/tutorials/beginner/data_loading_tutorial.html#sphx-glr-beginner-data-loading-tutorial-py

Also read on how to give a custom collate function to the DataLoader (some threads on the forums cover it)

1 Like

Regarding the parquet files, in my case the major problem is that pyarrow.Table objects are not serializable between threads of the data loader in case if you want use num_workers>=2. So probably it is something to consider for anyone who’s trying to wrap the parquet files with the dataset interface.

Did you find a workaround for this bit?

Hi, we are using multi worker dataloader to read parquet file, right now our design is each __get_index__ will get all data from one file, but somehow if the file size is large(2G level), it will have some issue like

 File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 761, in _try_get_data
    data = self._data_queue.get(timeout=timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/queues.py", line 104, in get
    if not self._poll(timeout):
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 257, in poll
    return self._poll(timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 414, in _poll
    r = wait([self], timeout)
  File "/home/miniconda/lib/python3.6/multiprocessing/connection.py", line 911, in wait
    ready = selector.select(timeout)
  File "/home/miniconda/lib/python3.6/selectors.py", line 376, in select
    fd_event_list = self._poll.poll(timeout)
  File "/home/zhrui/.local/lib/python3.6/site-packages/torch/utils/data/_utils/signal_handling.py", line 66, in handler
    _error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 90300) is killed by signal: Killed.

is it because per worker dataloader has memory limit or computation time limit? @smth

We try to directly return the dataframe but it is not allowed in pytorch data loader, if we try to convert to numpy array or dict it will be the above problem

I have checked the the source code for dataloader, basically we could set a timeout

   def _try_get_data(self, timeout=_utils.MP_STATUS_CHECK_INTERVAL):
        # Tries to fetch data from `self._data_queue` once for a given timeout.
        # This can also be used as inner loop of fetching without timeout, with
        # the sender status as the loop condition.
        #
        # This raises a `RuntimeError` if any worker died expectedly. This error
        # can come from either the SIGCHLD handler in `_utils/signal_handling.py`
        # (only for non-Windows platforms), or the manual check below on errors
        # and timeouts.
        #
        # Returns a 2-tuple:
        #   (bool: whether successfully get data, any: data if successful else None)
        try:
            data = self._data_queue.get(timeout=timeout)

the default is 5s,

MP_STATUS_CHECK_INTERVAL = 5.0
r"""Interval (in seconds) to check status of processes to avoid hanging in
    multiprocessing data loading. This is mainly used in getting data from
    another process, in which case we need to periodically check whether the
    sender is alive to prevent hanging."""

what is this interval means? I try to increase timeout in multi worker definition but get same error. only difference is final error msg will be like

RuntimeError: DataLoader worker (pid 320) is killed by signal: Segmentation fault.