Hello everyone, I am developing a system that will classify the load on a block device in a storage system. I’m going to train the model on the already collected sequences, and with the inference, the data will come in the form of a single element similar to the sequence that was trained. My problem is that the losses on training and validation do not fall below one and val accuracy where low, like 35%. Here is my code, it has all the necessary comments. At the moment, I am applying synthetic data for training. There are the same problems with real data.
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import numpy as np
from sklearn.model_selection import train_test_split
from torch.nn.utils.rnn import pack_padded_sequence, pad_sequence, pad_packed_sequence
# --- Data Preparation ---
def generate_visual_data(num_sequences=300):
data, labels = [], []
for i in range(num_sequences):
class_type = np.random.choice([0, 1, 2])
sequence_length = np.random.randint(5, 15)
if class_type == 0: # Short bursts of high delta_ns
sequence = [{'delta_ns': np.random.randint(80000, 100000) if j < 3 else np.random.randint(100, 1000),
'len': np.random.choice([1024, 2048, 4096]),
'pattern': np.random.randint(0, 2),
'qdelta_ns': np.random.randint(500, 5000),
'rwflg': np.random.randint(0, 2),
'sector': np.random.randint(100000, 150000),
'ts': np.random.randint(1000000, 5000000)} for j in range(sequence_length)]
elif class_type == 1: # Alternating pattern values
sequence = [{'delta_ns': np.random.randint(100, 100000),
'len': np.random.choice([1024, 2048, 4096]),
'pattern': j % 2,
'qdelta_ns': np.random.randint(500, 5000),
'rwflg': np.random.randint(0, 2),
'sector': np.random.randint(100000, 150000),
'ts': np.random.randint(1000000, 5000000)} for j in range(sequence_length)]
else: # Gradually increasing len values
sequence = [{'delta_ns': np.random.randint(100, 100000),
'len': 1024 + j * 200,
'pattern': np.random.randint(0, 2),
'qdelta_ns': np.random.randint(500, 5000),
'rwflg': np.random.randint(0, 2),
'sector': np.random.randint(100000, 150000),
'ts': np.random.randint(1000000, 5000000)} for j in range(sequence_length)]
data.append(sequence)
labels.append(class_type)
return data, labels
def generate_data(num_sequences, sequence_length_range):
data, labels = [], []
for _ in range(num_sequences):
sequence_length = np.random.randint(*sequence_length_range)
sequence = [{
"delta_ns": np.random.randint(100, 100000),
"len": np.random.choice([1024, 2048, 4096]),
"pattern": np.random.randint(0, 2),
"qdelta_ns": np.random.randint(500, 5000),
"rwflg": np.random.randint(0, 2),
"sector": np.random.randint(100000, 150000),
"ts": np.random.randint(1000000, 5000000)
} for _ in range(sequence_length)]
label = np.random.randint(0, 3) # Assuming 3 classes
data.append(sequence)
labels.append(label)
return data, labels
class SequenceDataset(Dataset):
def __init__(self, sequences, labels):
self.sequences = sequences
self.labels = labels
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
sequence = self.sequences[idx]
label = self.labels[idx]
features = [[seq['delta_ns'], seq['len'], seq['pattern'], seq['qdelta_ns'], seq['rwflg'], seq['sector'], seq['ts']] for seq in sequence]
return torch.tensor(features, dtype=torch.float), torch.tensor(label, dtype=torch.long), len(sequence)
def collate_fn(batch):
sequences, labels, lengths = zip(*batch)
sequences_padded = pad_sequence(sequences, batch_first=True)
labels = torch.tensor(labels, dtype=torch.long)
lengths = torch.tensor(lengths, dtype=torch.long)
return sequences_padded, labels, lengths
# --- Model Definition ---
class CNNLSTMClassifier(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_rate=0.5):
super(CNNLSTMClassifier, self).__init__()
self.cnn = nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout_rate)
self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers, batch_first=True, dropout=dropout_rate, bidirectional=True)
self.fc = nn.Linear(hidden_size * 2, num_classes) # *2 for bidirectional
def forward(self, x, lengths, hidden=None):
# Adjust x to match Conv1d input expectation: (batch, channels, seq_len)
x = x.permute(0, 2, 1)
x = self.cnn(x)
x = self.relu(x)
x = self.dropout(x)
# Return to (batch, seq_len, features) for LSTM
x = x.permute(0, 2, 1)
# Pack the sequence if lengths are provided for handling variable lengths
x_packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
out, hidden = self.lstm(x_packed, hidden) # Pass hidden state if provided
out, _ = pad_packed_sequence(out, batch_first=True)
out = self.dropout(out[:, -1, :])
out = self.fc(out)
return out, hidden # Return output and hidden state for stateful processing
# Parameters
num_sequences = 200
input_size = 7 # Original number of features, treated as in_channels for CNN
cnn_output_size = 64 # Output channels from CNN, serving as features for LSTM
hidden_size = 128
num_layers = 2
num_classes = 3
dropout_rate = 0.5
batch_size = 32 # Batch size
learning_rate = 0.0001 # Learning rate
num_epochs = 100 # Number of epochs to train
data, labels = generate_visual_data(num_sequences)
train_data, val_data, train_labels, val_labels = train_test_split(data, labels, test_size=0.2, random_state=42)
train_dataset = SequenceDataset(train_data, train_labels)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_dataset = SequenceDataset(val_data, val_labels)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
# --- Training and Validation ---
# Assuming `train_loader` and `val_loader` are already defined
# Model instantiation
model = CNNLSTMClassifier(input_size, hidden_size, num_layers, num_classes, dropout_rate)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1) # Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10, factor=0.5,verbose=True)
# Training Loop
for epoch in range(num_epochs):
model.train()
train_loss = 0.0
for sequences, labels, lengths in train_loader:
optimizer.zero_grad()
outputs, _ = model(sequences, lengths)
loss = criterion(outputs, labels)
train_loss += loss.item()
loss.backward()
optimizer.step()
# Validation phase
model.eval()
val_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for sequences, labels, lengths in val_loader:
outputs, _ = model(sequences, lengths)
loss = criterion(outputs, labels)
val_loss += loss.item()
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_loss /= len(train_loader)
val_loss /= len(val_loader)
accuracy = 100 * correct / total
print(f'Epoch: {epoch+1}/{num_epochs} \tTraining Loss: {train_loss:.4f} \tValidation Loss: {val_loss:.4f} \tAccuracy: {accuracy:.2f}%')
# Step the scheduler
#scheduler.step()
scheduler.step(val_loss)
# --- Stateful Inference ---
new_sequence = [
{"delta_ns": 150, "len": 2048, "pattern": 1, "qdelta_ns": 500, "rwflg": 0, "sector": 120000, "ts": 2000000},
{"delta_ns": 200, "len": 1024, "pattern": 0, "qdelta_ns": 700, "rwflg": 1, "sector": 130000, "ts": 3000000},
]
new_features = [[seq['delta_ns'], seq['len'], seq['pattern'], seq['qdelta_ns'], seq['rwflg'], seq['sector'], seq['ts']] for seq in new_sequence]
new_features_tensor = torch.tensor([new_features], dtype=torch.float)
model.eval()
hidden = (torch.zeros(num_layers * 2, 1, hidden_size), # *2 for bidirectional
torch.zeros(num_layers * 2, 1, hidden_size)) # Initially, there's no hidden state
# Assuming new_features_tensor is prepared correctly
for i in range(new_features_tensor.size(1)):
single_feature = new_features_tensor[:, i:i+1, :]
with torch.no_grad():
# Note: If your model still requires lengths, you'd pass [1] for each single step
output, hidden = model(single_feature, torch.tensor([1]), hidden=hidden)
probabilities = torch.softmax(output, dim=1)
print(f'Step {i+1}, Class Probabilities: {probabilities.numpy()}')