I’m building a dataset where there are choice ‘sessions’ that each include many rows. I can’t load all the data into memory at once, so I’ve been trying to load chunks of ~2500 sessions into memory at a time and then using some logic in the __getitem__
to load the next chunk into memory once it’s iterated through the current chunk.
It works with num_workers=0
, but when trying to do it with multiprocessing it runs into issues (DataLoader worker is killed by signal
), presumably because multiple threads start trying to load the next chunk of data into memory at once.
Any thoughts for how to fix? Current code below !
class ChoiceDataset(Dataset):
def __init__(self, filename, chunksize, choice_var, feature_vars, fe_col, n_y, n_sessions):
# Dimensions
self.n_y = n_y # items
self.n_x = len(feature_vars) # features
self.length = n_sessions # total sessions
# Features
self.choice_var = choice_var
self.features = feature_vars
self.chunksize = chunksize
# Sessions -- read in just a chunk for now
self.filename = filename
filters = [("session_id", "<", chunksize)]
self.sessions = pd.read_parquet(filename, filters=filters).set_index('session_id')
def __getitem__(self, idx):
session_id = idx
# If in memory, grab session, else load next chunk into memory first (faster than checking if in mem each time)
try:
session = self.sessions.loc[session_id]
except: # Only reason above should fail if that session is not in memory, so load next chunk
filters = [("session_id", ">=", self.chunksize), ("session_id", "<", session_id + self.chunksize)]
self.sessions = pd.read_parquet(self.filename).set_index('session_id')
session = self.sessions.loc[session_id]
# Pull out the features
session_x = torch.tensor(session[self.features].values)
session_x = torch.tensor(session[self.features].values)
session_x_fe = torch.LongTensor(session[self.fe_col].values)
# Get the chosen item
label = torch.tensor(np.argwhere(session[self.choice_var].values>0).item())
return (session_x, session_x_fe, label)
def __len__(self):
return self.length