I’m building a dataset where there are choice ‘sessions’ that each include many rows. I can’t load all the data into memory at once, so I’ve been trying to load chunks of ~2500 sessions into memory at a time and then using some logic in the
__getitem__ to load the next chunk into memory once it’s iterated through the current chunk.
It works with
num_workers=0, but when trying to do it with multiprocessing it runs into issues (
DataLoader worker is killed by signal), presumably because multiple threads start trying to load the next chunk of data into memory at once.
Any thoughts for how to fix? Current code below !
class ChoiceDataset(Dataset): def __init__(self, filename, chunksize, choice_var, feature_vars, fe_col, n_y, n_sessions): # Dimensions self.n_y = n_y # items self.n_x = len(feature_vars) # features self.length = n_sessions # total sessions # Features self.choice_var = choice_var self.features = feature_vars self.chunksize = chunksize # Sessions -- read in just a chunk for now self.filename = filename filters = [("session_id", "<", chunksize)] self.sessions = pd.read_parquet(filename, filters=filters).set_index('session_id') def __getitem__(self, idx): session_id = idx # If in memory, grab session, else load next chunk into memory first (faster than checking if in mem each time) try: session = self.sessions.loc[session_id] except: # Only reason above should fail if that session is not in memory, so load next chunk filters = [("session_id", ">=", self.chunksize), ("session_id", "<", session_id + self.chunksize)] self.sessions = pd.read_parquet(self.filename).set_index('session_id') session = self.sessions.loc[session_id] # Pull out the features session_x = torch.tensor(session[self.features].values) session_x = torch.tensor(session[self.features].values) session_x_fe = torch.LongTensor(session[self.fe_col].values) # Get the chosen item label = torch.tensor(np.argwhere(session[self.choice_var].values>0).item()) return (session_x, session_x_fe, label) def __len__(self): return self.length