Handling streaming data

Hi everyone,

I’m in a pickle here. I have a problem where I get streaming data and a Pytorch model which is to be continuously trained on the said streaming data (in batches of 16 multi-feature data points). Now I have been trying to come up with a class/code structure which would facilitate this, but I have been unable to come up with a solution. My model is initialized through a class. I tried something like this:

def train(model, data, batch_size):

# Get data in appropriate format
train_data = TensorDataset(torch.from_numpy(np.array(data)), torch.from_numpy(np.array(data)))
train_loader = DataLoader(train_data, shuffle=False, batch_size=batch_size)

model = model.double()

# Model Hyperparameters
criterion = nn.MSELoss()
optimizer = optim.RMSprop(model.parameters())


# Keep track of training loss
train_loss = 0.
# Train the model
model.train()
for data, label in train_loader:
    data = data.double()
    label = label.double()
    
    # Clear gradients of all optimized variables
    optimizer.zero_grad()
    
    # Convert data to appropriate format of : (batch_size, seq_len, input_dimensions)
    data = data.view(batch_size, 1, data.size(1))

    # Forward pass
    output = model(data)
    
    # Calcualte batch loss
    loss = criterion(output.squeeze(), label)
    
    # Backward pass
    loss.backward()
    
    # Perform a single optimization step
    optimizer.step()
    
    return model, loss

And then calling the function on each collected batch individually, such as:

model, loss = train(model, data[:16], 16)

But this is a very stupid approach in my mind and there has got to be a better approach. Moreover, this does not give me the right losses anyway (I want to get losses per batch as an output) as the losses are calculated as if the model is re-tranied from scratch at every call (I thought that passing back the model would ensure that its weights after each update are maintained, but I seem to be wrong).

Any help would e appreciated.

P.S.: I am not a software engineer but more of a domain expert data scientist and so please forgive me if I made any naive mistake.

It seems IterableDataset would be suitable for streaming data:

For example, such a dataset, when called iter(dataset) , could return a stream of data reading from a database, a remote server, or even logs generated in real time.

2 Likes

Thank you. I’ll give it a try today.

hello, is there any tutorial or example explaining carefully how to stream temporally coherent batches?

i did this github here: https://github.com/etienne87/pytorch-streamloader
to explain precisely what I would like from pytorch. (well i did it and it works but perhaps it can be done in 3 lines of code in pytorch ^^).

2 Likes

Hi there, since so many people clicked there, i just wanted to say that i simplified a bit the code, and that, in fact, the easiest for people who train networks with memory could be to just use an IterableDataset where you yield the worker’s id at the same time as your data. Otherwise, I did a wrapper around Pytorch DataLoader that just concatenates the data from different FIFOs indexed by the worker’s id.