Recently, I’ve been making a model to predict the driver’s intension using the data sampled at 10HZ.

I try to predict the driver’s intention by remembering the data up to 3 seconds ago.

So I set window_size for 3 seconds.

Input data has a 25 features so input data’s shape is (Batchsize) * (timeseq total 30) * (feature total 25)

I’ll predict one of 0,1 and 2 as a result of output

So I thought this problem was a multi-class classification model, so I organized the model as follows.

I used relu function, dropout, and BatchNomalize because the test result and train result were very different.

I think it’s because of overfitting, but I don’t know how to solve it.

Finally, I used weightedrandomsampler to resolve class imbalances in the data used in the model.

I’ll attach the whole code.

```
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, utils
from torchviz import make_dot
from sklearn.preprocessing import MinMaxScaler
from torch.autograd import Variable
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import WeightedRandomSampler
if torch.cuda.is_available():
DEVICE = torch.device('cuda')
else:
DEVICE = torch.device('cpu')
print('Using Pytorch Version: ',torch.__version__,
'Device: ',DEVICE)
EPOCHS = 1000
batch_size = [64,128,256,512,1024]
learning_rates = [0.001,0.0001]
class FeatureDataset(Dataset):
def __init__(self, file_name):
_x, _y = [], []
file_out = pd.read_csv(file_name)
x = file_out.iloc[:,0:-1].values
y = file_out.iloc[:,-1].values
mMscaler = MinMaxScaler()
mMscaler.fit(x)
x = mMscaler.fit_transform(x)
for i in range(30, len(y)-29):
_x.append(x[i-30:i,:])
_y.append(y[i+29])
ax = np.array(_x)
ay = np.array(_y)
self.x = torch.tensor(ax, dtype=torch.float32)
self.y = torch.tensor(ay)
def __len__(self):
return len(self.y)
def __getitem__(self,idx):
return self.x[idx], self.y[idx]
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.lstm = nn.LSTM(input_size = 25, hidden_size = 30, num_layers = 1,
batch_first = True)
self.linear = nn.Linear(30,10)
self.linear1 = nn.Linear(10,3)
self.batch_norm1 = nn.BatchNorm1d(10)
self.drop = nn.Dropout(p =0.5)
def forward(self, input_seq):
output_seq, _ = self.lstm(input_seq)
last_output = output_seq[:,-1]
class_predictions = self.drop(last_output)
class_predictions = self.linear(class_predictions)
class_predictions = self.batch_norm1(class_predictions)
class_predictions = F.relu(class_predictions)
class_predictions = self.linear1(class_predictions)
class_predictions = F.softmax(class_predictions,dim=1)
return class_predictions
arr = FeatureDataset('Untitled 5.csv')
arr2 = FeatureDataset('real_test.csv')
print(arr.x.shape)
class_weights = [1/256668, 1/46829, 1/45261]
sample_weights = [0] * len(arr)
for idx, (data, label) in enumerate(arr):
class_weight = class_weights[label]
sample_weights[idx] = class_weight
sampler = WeightedRandomSampler(sample_weights, num_samples = len(sample_weights), replacement = True)
train_loader = torch.utils.data.DataLoader(arr,
batch_size = 32, sampler = sampler,
shuffle = False)
for trainx,trainy in train_loader:
print(trainx.size, trainx.shape)
break
for BATCH_SIZE in batch_size:
for learning_rate in learning_rates:
model = Model().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
criterion = nn.CrossEntropyLoss()
train_loader = torch.utils.data.DataLoader(arr,
batch_size = BATCH_SIZE, sampler = sampler,
shuffle = False)
print("BATCH_SIZE : ",BATCH_SIZE, "lr = ",learning_rate)
for epoch in range(EPOCHS):
cnt_0 = 0
cnt_1 = 0
cnt_2 = 0
train_loss = 0
train_correct = 0
model.train()
for batch_idx,(train_x, train_y) in enumerate(train_loader):
train_x = train_x.to(DEVICE)
train_y = train_y.to(DEVICE)
optimizer.zero_grad()
output = model(train_x)
loss = criterion(output, train_y)
loss.backward()
optimizer.step()
prediction = output.max(1, keepdim = True)[1]
for i in range(len(prediction)):
pre = int(prediction[i])
if pre is 0:
cnt_0 +=1
elif pre is 1:
cnt_1 +=1
elif pre is 2:
cnt_2 +=1
train_correct += prediction.eq(train_y.view_as(prediction)).sum().item()
train_loss += loss.item()
train_loss /= (len(train_loader.dataset) / BATCH_SIZE)
train_accuracy = 100. * train_correct / len(train_loader.dataset)
print(cnt_0, ' ', cnt_1, ' ', cnt_2)
print("[Train EPOCH: {}], \tTrain Loss: {:.4f}, \tTrain Accuracy: {:.2f} % \n".format(
epoch, train_loss, train_accuracy))
test_loader = torch.utils.data.DataLoader(arr2, batch_size = BATCH_SIZE,
shuffle = False)
test_loss = 0
test_correct = 0
model.eval()
cnt_0 = 0
cnt_1 = 0
cnt_2 = 0
with torch.no_grad():
for test_x, test_y in test_loader:
test_x = test_x.to(DEVICE)
test_y = test_y.to(DEVICE)
output = model(test_x)
loss = criterion(output, test_y)
test_loss += criterion(output, test_y).item()
prediction = output.max(1, keepdim = True)[1]
for i in range(len(prediction)):
pre = int(prediction[i])
if pre is 0:
cnt_0 +=1
elif pre is 1:
cnt_1 +=1
elif pre is 2:
cnt_2 +=1
test_correct += prediction.eq(test_y.view_as(prediction)).sum().item()
print(cnt_0, ' ', cnt_1, ' ', cnt_2)
test_loss /= (len(test_loader.dataset) / BATCH_SIZE)
test_accuracy = 100. * test_correct / len(test_loader.dataset)
print("[EPOCH: {}], \tTest Loss: {:.4f}, \tTest Accuracy: {:.2f} % \n".format(
epoch, test_loss, test_accuracy))
print('-----------------------------------')
```

Your answers always help me a lot. It’s my first time writing a question, so I ask for your understanding even if you don’t understand.