Using iterable datasets

Hello,

Im trying to set up a processes to deal with streaming sensor data. I understand this is the exact usecase for iterable datasets. Unfortunately it seems to be a relatively new feature and tutorials on how to make and use iterable datasets appear to be rare.

To start I am mocking by making my iter function grab from a CSV. I am not sure exactly what my iter function should be returning. Currently its returning a ‘TensorDataset’ because that’s what I was able to find examples of how to consume in training/validation but it complains about that TypeError: iter() returned non-iterator of type ‘TensorDataset’.

How do I put together the information of the data and its labels for a DataLoader to use so I can train my model?

Thank you

Hi @pzg Welcome to the forums!

Have you taken a look at this tutorial on dataloading?

For specific advice, it’d be easier if you can post an example of your code so far.

Hello @JamesTrick
Thank you for the advice. I had seen that tutorial but thought it didnt apply because it was about mapped datasets which I understand doesnt translate well to streaming data. I tried setting it back to a plain Dataset but unfortunately I would have no idea how to set the length in the case when I am getting real live sensor data in real time which was what I was hoping Iterable Datasets would help solve.

The real goal is to set up a training flow to accept real time streaming live data with no discernible end beforehand.

I have taken the advice to return a dict as stated in the tutorial but it complains that its not getting an iterable

I am calling it from the following snippet (between the ~~~~~~~~~) in the training function and it returns the following error message (never prints)

TypeError: iter() returned non-iterator of type ‘dict’

trainer = datasets.MyLSLIterableDataset(
                     path_list=path_list_train)        
for i, (features, labels) in enumerate(trainer):
            print('entered first enumeration')

My dataset implementation is :

class MyLSLIterableDataset(torch.utils.data.IterableDataset):
    def __init__(self, num_columns, chunk_size=512, path_list=None): #, start, end):
        super(torch.utils.data.dataset.IterableDataset).__init__()
        self.resolver = ContinuousResolver()
        self.inlets = []
        self.path_list = path_list
        self.already_given = 0
        self.path_no = 0
        self.chunk_size = chunk_size

    def __iter__(self):
        #take data from buffer, lets say a chunk is 1000 buffered samples and clear it?
        result = None
        while(result is None):
            #try:
                result = get_data(path_list = self.path_list, path_no = self.path_no, start_from = self.already_given, num_to_read = self.chunk_size, skip_columns = ['experiment'])
                self.already_given += self.chunk_size
                print(result)
                self.data = result
                #raise Exception('Im sure this is the wrong way to debug')
                if result is None:
                    self.path_no += 1
                    self.already_given = 0
                    if self.path_no > len(self.path_list):
                        raise StopIteration
        return result


def preprocess_data(dataframe, label):
    scaler = StandardScaler()
    X_train = dataframe.drop(columns=[label], axis = 1)
    y_train = dataframe[label]
    #X_train = scaler.fit_transform(X_train) -> this would make it noniterable, not sure why
    X_train = X_train.to_numpy()
    y_train = pd.get_dummies(y_train)
    features_train = torch.from_numpy(X_train).float()
    labels_train = torch.from_numpy(y_train.to_numpy())#.type(torch.LongTensor)
    return {'features' : features_train, 'labels' :labels_train} 


def get_data(path_list, path_no, start_from, num_to_read, skip_columns = ['plzskip']):
    if path_list is None:
        raise Exception('Path is None but Live Mode not yet implemented!')
    else: #please presepperate validation and test data somehow not sure how to do it here
        try:
            cols = list(pd.read_csv(path_list[path_no], nrows =1))
            return_data_as_df =  pd.read_csv(path_list[path_no], skiprows = start_from, nrows = num_to_read, usecols =[i for i in cols if i not in skip_columns] )

            return_data_as_df = preprocess_data(
                                return_data_as_df, 'event')
            #print('getting first data')
            return return_data_as_df
        except Exception as e:
            print(e)
            return None #probably ran out of stuff on that path

Ive tried several things including having it return itself an iterable database and it still gives me that same type error:

TypeError: iter() returned non-iterator of type ‘MyLSLIterableDataset’

I would really love your advice on how to handle either this error or a better strategy for dealing with streaming data

In case anyone else runs into this. What it turns out it was is that to implement an iterable of any kind iter needs to return self and you also need to implement a next function that returns the actual data.

This may be clear to anyone familiar with iterators but wasn’t immediately obvious to me just from the iterable dataset documentation which only spoke of overwriting iter

6 Likes

weird why documentation didn’t mention next at all, consider it plays a important role of actually fetching data.