Hi bro.
I want to apply time series data to VGG16.
Input data is processed in order of batch size, time, and characteristics.
(Batch, Channel, leng, feature) => (?, 1, 300, 23)
It is well applied to training, but it shows poor performance in testing.
What part is the problem?
Here is my code
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, utils
from sklearn.preprocessing import MinMaxScaler
from torch.autograd import Variable
from torch.utils.data import WeightedRandomSampler
import visdom
if torch.cuda.is_available():
DEVICE = torch.device('cuda')
else:
DEVICE = torch.device('cpu')
print('Using Pytorch Version: ',torch.__version__,
'Device: ',DEVICE)
EPOCHS = 201
batch_size = [1024,512,256,128,64,32]
learning_rates = [0.001,0.0001]
vis = visdom.Visdom(env='LC')
class FeatureDataset(Dataset):
def __init__(self, file_name):
_x, _y = [], []
file_out = pd.read_csv(file_name)
x = file_out.iloc[:,0:-1].values
y = file_out.iloc[:,-1].values
mMscaler = MinMaxScaler()
mMscaler.fit(x)
x = mMscaler.fit_transform(x)
for i in range(300, len(y)):
_x.append(x[i-300:i,:])
_y.append(y[i])
ax = np.array(_x)
ay = np.array(_y)
ax = torch.tensor(ax, dtype=torch.float32)
self.x = ax.unsqueeze(dim=1)
self.y = torch.tensor(ay)
def __len__(self):
return len(self.y)
def __getitem__(self,idx):
return self.x[idx], self.y[idx]
arr = FeatureDataset('train.csv')
arr2 = FeatureDataset('test.csv')
print(arr.x.shape)
train_loader = torch.utils.data.DataLoader(arr,batch_size = 32,
shuffle = True, drop_last = True)
for x,y in train_loader:
print(x.shape)
print(y.shape)
break
class CNN(nn.Module):
def __init__(self):
super(CNN,self).__init__()
self.conv1_1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=(3,23))
self.conv1_2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=(3,1))
self.pool = nn.MaxPool2d(kernel_size=(2,1),stride=(2,1))
self.batch2d_1 = nn.BatchNorm2d(64,momentum=0.9)
self.conv2_1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3,1))
self.conv2_2 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=(3,1))
self.pool = nn.MaxPool2d(kernel_size=(2,1),stride=(2,1))
self.batch2d_2 = nn.BatchNorm2d(128,momentum=0.9)
self.conv3_1 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3,1))
self.conv3_2 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(3,1))
self.pool = nn.MaxPool2d(kernel_size=(2,1),stride=(2,1))
self.batch2d_3 = nn.BatchNorm2d(256,momentum=0.9)
self.conv4_1 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=(3,1))
self.conv4_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3,1))
self.pool = nn.MaxPool2d(kernel_size=(2,1),stride=(2,1))
self.batch2d_4 = nn.BatchNorm2d(512,momentum=0.9)
self.conv5_1 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3,1))
self.conv5_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3,1))
self.pool = nn.MaxPool2d(kernel_size=(2,1),stride=(2,1))
self.batch2d_5 = nn.BatchNorm2d(512,momentum=0.9)
self.drop = nn.Dropout(0.5)
self.fc1 = nn.Linear(5*512,4096)
self.fc2 = nn.Linear(4096,4)
self.batch1d = nn.BatchNorm1d(4096,momentum=0.9)
def forward(self, x):
x = self.conv1_1(x)
x = self.batch2d_1(x)
x = F.relu(x)
x = self.conv1_2(x)
x = self.batch2d_1(x)
x = F.relu(x)
x = self.pool(x)
x = self.conv2_1(x)
x = self.batch2d_2(x)
x = F.relu(x)
x = self.conv2_2(x)
x = self.batch2d_2(x)
x = F.relu(x)
x = self.pool(x)
x = self.conv3_1(x)
x = self.batch2d_3(x)
x = F.relu(x)
x = self.conv3_2(x)
x = self.batch2d_3(x)
x = F.relu(x)
x = self.pool(x)
x = self.conv4_1(x)
x = self.batch2d_4(x)
x = F.relu(x)
x = self.conv4_2(x)
x = self.batch2d_4(x)
x = F.relu(x)
x = self.pool(x)
x = self.conv5_1(x)
x = self.batch2d_5(x)
x = F.relu(x)
x = self.conv5_2(x)
x = self.batch2d_5(x)
x = F.relu(x)
x = self.pool(x)
x = x.view(-1,5*512)
x = self.fc1(x)
x = self.batch1d(x)
x = F.relu(x)
x = self.fc2(x)
return x
def train(model, train_loader, optimizer):
cnt_0 = 0
cnt_1 = 0
cnt_2 = 0
cnt_3 = 0
train_loss = 0
train_correct = 0
model.train()
cnt = len(train_loader)
for train_x, train_y in train_loader:
train_x = train_x.to(DEVICE)
train_y = train_y.to(DEVICE)
optimizer.zero_grad()
output = model(train_x)
loss = criterion(output, train_y)
loss.backward()
optimizer.step()
prediction = output.max(1, keepdim = True)[1]
for i in range(len(prediction)):
pre = int(prediction[i])
if pre is 0:
cnt_0 +=1
elif pre is 1:
cnt_1 +=1
elif pre is 2:
cnt_2 +=1
elif pre is 3:
cnt_3 +=1
train_correct += prediction.eq(train_y.view_as(prediction)).sum().item()
train_loss += loss.item()
train_loss /= cnt
train_accuracy = 100. * train_correct / len(arr.y)
if epoch % 5 == 0:
print(cnt_0, ' ', cnt_1, ' ', cnt_2,' ', cnt_3)
print("[Train EPOCH: {}], \tTrain Loss: {:.4f}, \tTrain Accuracy: {:.2f} % \n".format(
epoch, train_loss, train_accuracy))
return train_loss, train_accuracy
def evaluate(model, test_loader):
test_loss = 0
test_correct = 0
model.eval()
cnt_0 = 0
cnt_1 = 0
cnt_2 = 0
cnt_3 = 0
cnt = len(test_loader)
with torch.no_grad():
for test_x, test_y in test_loader:
test_x = test_x.to(DEVICE)
test_y = test_y.to(DEVICE)
output = model(test_x)
loss = criterion(output, test_y)
test_loss += criterion(output, test_y).item()
prediction = output.max(1, keepdim = True)[1]
for i in range(len(prediction)):
pre = int(prediction[i])
if pre is 0:
cnt_0 +=1
elif pre is 1:
cnt_1 +=1
elif pre is 2:
cnt_2 +=1
elif pre is 3:
cnt_3 +=1
test_correct += prediction.eq(test_y.view_as(prediction)).sum().item()
test_loss /= cnt
test_accuracy = 100. * test_correct / len(arr2.y)
if epoch % 5 == 0:
print(cnt_0, ' ', cnt_1, ' ', cnt_2,' ', cnt_3)
print("[EPOCH: {}], \tTest Loss: {:.4f}, \tTest Accuracy: {:.2f} % \n".format(
epoch, test_loss, test_accuracy))
return test_loss, test_accuracy
EPOCHS = 201
batch_size = [1024,512,256,128,64,32]
learning_rates = [0.0001,0.0001]
for BATCH_SIZE in batch_size:
for learning_rate in learning_rates:
model = CNN().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate,weight_decay=0.00005)
criterion = nn.CrossEntropyLoss()
train_loader = torch.utils.data.DataLoader(arr,batch_size = BATCH_SIZE,
shuffle = True)
tit = '_' + str(BATCH_SIZE) +'_' + str(learning_rate)
loss_plt = vis.line(Y = torch.Tensor(1,2).zero_(),
opts = dict(title = 'loss'+tit, legend = ['train_loss','test_loss'],
showlegend=True))
accuracy_plt = vis.line(Y = torch.Tensor(1,2).zero_(),
opts = dict(title = 'accuracy'+tit, legend = ['train_accuracy','test_accuracy'],
showlegend=True))
test_loader = torch.utils.data.DataLoader(arr2, batch_size = BATCH_SIZE, shuffle = False)
print("BATCH_SIZE : ",BATCH_SIZE, "lr = ",learning_rate)
for epoch in range(EPOCHS):
train_loss, train_accuracy = train(model, train_loader, optimizer)
loss, accuracy = evaluate(model, test_loader)
loss = torch.Tensor([[train_loss,loss]])
accuracy = torch.Tensor([[train_accuracy,accuracy]])
vis.line(X = torch.Tensor([epoch]), Y = loss, win=loss_plt,update = 'append')
vis.line(X = torch.Tensor([epoch]), Y = accuracy, win=accuracy_plt,update = 'append')
if epoch % 5 == 0:
print('--------------------------')
if epoch % 10 == 0:
learning_rate * 0.001