Hello @JamesTrick
Thank you for the advice. I had seen that tutorial but thought it didnt apply because it was about mapped datasets which I understand doesnt translate well to streaming data. I tried setting it back to a plain Dataset but unfortunately I would have no idea how to set the length in the case when I am getting real live sensor data in real time which was what I was hoping Iterable Datasets would help solve.
The real goal is to set up a training flow to accept real time streaming live data with no discernible end beforehand.
I have taken the advice to return a dict as stated in the tutorial but it complains that its not getting an iterable
I am calling it from the following snippet (between the ~~~~~~~~~) in the training function and it returns the following error message (never prints)
TypeError: iter() returned non-iterator of type ‘dict’
trainer = datasets.MyLSLIterableDataset(
path_list=path_list_train)
for i, (features, labels) in enumerate(trainer):
print('entered first enumeration')
My dataset implementation is :
class MyLSLIterableDataset(torch.utils.data.IterableDataset):
def __init__(self, num_columns, chunk_size=512, path_list=None): #, start, end):
super(torch.utils.data.dataset.IterableDataset).__init__()
self.resolver = ContinuousResolver()
self.inlets = []
self.path_list = path_list
self.already_given = 0
self.path_no = 0
self.chunk_size = chunk_size
def __iter__(self):
#take data from buffer, lets say a chunk is 1000 buffered samples and clear it?
result = None
while(result is None):
#try:
result = get_data(path_list = self.path_list, path_no = self.path_no, start_from = self.already_given, num_to_read = self.chunk_size, skip_columns = ['experiment'])
self.already_given += self.chunk_size
print(result)
self.data = result
#raise Exception('Im sure this is the wrong way to debug')
if result is None:
self.path_no += 1
self.already_given = 0
if self.path_no > len(self.path_list):
raise StopIteration
return result
def preprocess_data(dataframe, label):
scaler = StandardScaler()
X_train = dataframe.drop(columns=[label], axis = 1)
y_train = dataframe[label]
#X_train = scaler.fit_transform(X_train) -> this would make it noniterable, not sure why
X_train = X_train.to_numpy()
y_train = pd.get_dummies(y_train)
features_train = torch.from_numpy(X_train).float()
labels_train = torch.from_numpy(y_train.to_numpy())#.type(torch.LongTensor)
return {'features' : features_train, 'labels' :labels_train}
def get_data(path_list, path_no, start_from, num_to_read, skip_columns = ['plzskip']):
if path_list is None:
raise Exception('Path is None but Live Mode not yet implemented!')
else: #please presepperate validation and test data somehow not sure how to do it here
try:
cols = list(pd.read_csv(path_list[path_no], nrows =1))
return_data_as_df = pd.read_csv(path_list[path_no], skiprows = start_from, nrows = num_to_read, usecols =[i for i in cols if i not in skip_columns] )
return_data_as_df = preprocess_data(
return_data_as_df, 'event')
#print('getting first data')
return return_data_as_df
except Exception as e:
print(e)
return None #probably ran out of stuff on that path
Ive tried several things including having it return itself an iterable database and it still gives me that same type error:
TypeError: iter() returned non-iterator of type ‘MyLSLIterableDataset’
I would really love your advice on how to handle either this error or a better strategy for dealing with streaming data