DataLoader (with num_workers>0) with loading only subsets of data in at once gives errors

I’m building a dataset where there are choice ‘sessions’ that each include many rows. I can’t load all the data into memory at once, so I’ve been trying to load chunks of ~2500 sessions into memory at a time and then using some logic in the __getitem__ to load the next chunk into memory once it’s iterated through the current chunk.

It works with num_workers=0, but when trying to do it with multiprocessing it runs into issues (DataLoader worker is killed by signal), presumably because multiple threads start trying to load the next chunk of data into memory at once.

Any thoughts for how to fix? Current code below !

class ChoiceDataset(Dataset):
    def __init__(self, filename, chunksize, choice_var, feature_vars, fe_col, n_y, n_sessions):
        # Dimensions 
        self.n_y = n_y # items
        self.n_x = len(feature_vars) # features
        self.length = n_sessions # total sessions 

        # Features 
        self.choice_var = choice_var
        self.features = feature_vars
        self.chunksize = chunksize
                                                    
        # Sessions -- read in just a chunk for now 
        self.filename = filename
        filters = [("session_id", "<", chunksize)]
        self.sessions = pd.read_parquet(filename, filters=filters).set_index('session_id')
        
    def __getitem__(self, idx):
        
        session_id = idx        
        
        # If in memory, grab session, else load next chunk into memory first (faster than checking if in mem each time)
        try: 
            session = self.sessions.loc[session_id]
        except: # Only reason above should fail if that session is not in memory, so load next chunk
            filters = [("session_id", ">=", self.chunksize), ("session_id", "<", session_id + self.chunksize)]
            self.sessions = pd.read_parquet(self.filename).set_index('session_id')    
            session = self.sessions.loc[session_id] 
        
        # Pull out the features 
        session_x = torch.tensor(session[self.features].values)
        session_x = torch.tensor(session[self.features].values)
        session_x_fe = torch.LongTensor(session[self.fe_col].values)
        
        # Get the chosen item
        label = torch.tensor(np.argwhere(session[self.choice_var].values>0).item())

        return (session_x, session_x_fe, label)

    def __len__(self):
        return self.length

I’m not really familiar with parquet, but there seem to be some libraries, which provide an interface to load these data type, e.g. PyParquetLoaders, so maybe they are helpful.

This looks great — I’ll give it a try and report back.

Out of curiosity, is there an alternative storage solution that you think would work better?