Dataset Wrapper Class for Parallel Reads of HDF5 via Multiprocessing

I am needing to manage a large amount of physiological waveform data, like ECGs, and so far have found HDF5 to be the best for compatibility with Python, PyTorch, Pandas, etc. The ability to slice/query/read only certain rows of a dataset is particularly appealing. However, I am struggling to develop a stable wrapper class which allows for simple yet reliable parallel reads from many multiprocessing workers, such as the case with PyTorch dataset/dataloader. I am fine with single threaded writes as I only have to ETL my source data into the HDF5 once, but lacking parallel reads really hurts run times during data analysis, PyTorch model runs, etc. I have tried both PyTables via Pandas and h5py with the following scenarios:

  • One large HDF5 file
    • Could only get single process reads stable. mp pretty quickly corrupted the entire file.
  • One HDF5 file per time series interval
    • ie, hundreds of millions of files. Still having collisions leading to corrupted files even when opening read only with a context manager. Also means backing up or transferring so many files is very slow due to the high IO cost.

Apologies for no code samples, but I’ve tried dozens of approaches over the last few months (each was hundreds of lines) and eventually always ran into stability issues. Is there any good example code of how to build a dataset wrapper class backed by HDF5? AFAIK, h5py's MPI-based “Parallel HDF5” is not relevant since this is on one machine, and “Single Writer Multiple Reader” is not necessary as I am fine writing single-threaded before any reading. Ideally, I’d hope that whenever I need data in my project, I could just do similar to the following:

import dataset  # my HDF5 dataset wrapper class
import multiprocessing as mp

def dataloader(idxs):
  temp = []
  ds = dataset.Dataset()
  for _, idx in idxs.iterrows():
    df = ds.get_data(idx)
    # do some data wrangling in parallel
    temp.append(df)
  return temp

pool = mp.Pool(6)
data = pool.map(dataloader, indices)

Many thanks!