So, I’m training a neural network architecture on a particular wave signal detection.
the raw data are in .npy files and contains the time domain signal.
I did some feature extraction on the time domain signal like:
Discrete Fourier Transform to convert the time domain signal to frequency domain signal
I also got the magnitude of the complex numbers returned by the transform to get the amplitude spectrum, and the square of that to get the power spectrum (which were the two features I used)
My raw data shape is (N, 3, 4096), where N is number of samples, 3 corresponds to the 3 different detectors in the detector network used, and 4096 is the sample frequency.
My processed data shape is (N, 3, 2, 2049), where N is number of samples, 3 corresponds to the 3 different detectors in the detector network used, 2 for the amplitude spectrum and power spectrum features, and 2049 corresponding to the length of each feature
The extracted features are scaled to be within range of 0 and 1 with sklean MinMaxScaler.
The training and testing accuracy never exceeds 51%, however the loss seems to be reducing fine and I know that the loss and accuracy of classification problem that uses Cross Entropy are unrelated
Any suggestions?
Here’s code for my feature extraction class:
#Feature Extraction Class
class FeatureExtractor():
def __init__(self, input, sample_frequency=4096):
self.fs = sample_frequency
self.input = np.array(input.reshape(-1, 3, self.fs))
self.min_max_scaler = MinMaxScaler()
self.fourier_transform = self.__discrete_fourier_transform()
self.power_spectrum = self.__power_spectrum()
def __discrete_fourier_transform(self):
transform = np.fft.rfft(self.input)
transform = np.abs(transform)
return transform
def __power_spectrum(self):
PS = self.fourier_transform**2
return PS
def extract_features(self):
required_shape = (self.fourier_transform.shape[0], self.fourier_transform.shape[1], self.fourier_transform.shape[2])
Dim_1_transform = self.fourier_transform.reshape(-1, required_shape[2]).get()
Dim_1_power_spectrum = self.power_spectrum.reshape(-1, required_shape[2]).get()
scaled_fourier_transform = self.min_max_scaler.fit_transform(Dim_1_transform)
scaled_PSD = self.min_max_scaler.fit_transform(Dim_1_power_spectrum)
scaled_fourier_transform, scaled_PSD = scaled_fourier_transform.reshape(required_shape[0], required_shape[1], required_shape[2]), scaled_PSD.reshape(required_shape[0], required_shape[1], required_shape[2])
features = np.concatenate((np.array(scaled_fourier_transform), np.array(scaled_PSD)))
feature = features.transpose(1, 0, 2)
return features.reshape(-1, 3, 2, scaled_PSD.shape[-1])
My neural network consists of a CNN with 5 conv2d layers and a Feed Forward Network with 4 Linear layers like so:
#convolutional neural network class
class ConvolutionalNework(nn.Module):
def __init__(self, in_channels, out_channels, hidden_channels):
super(ConvolutionalNework, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.hidden_channels = hidden_channels
self.conv_layer_1 = nn.Sequential(
nn.Conv2d(self.in_channels, self.hidden_channels, kernel_size=1, stride=1),
nn.BatchNorm2d(self.hidden_channels),
nn.ELU()
)
self.conv_layer_2 = nn.Sequential(
nn.Conv2d(self.hidden_channels, self.hidden_channels*3, kernel_size=1, stride=2),
nn.LeakyReLU(0.3),
)
self.conv_layer_3 = nn.Sequential(
nn.Conv2d(self.hidden_channels*3, self.hidden_channels*6, kernel_size=1, stride=2),
nn.LeakyReLU(0.3),
)
self.conv_layer_4 = nn.Sequential(
nn.Conv2d(self.hidden_channels*6, self.hidden_channels*9, kernel_size=1, stride=2),
nn.BatchNorm2d(self.hidden_channels*9),
nn.LeakyReLU(0.3),
)
self.conv_layer_5 = nn.Sequential(
nn.Conv2d(self.hidden_channels*9, self.out_channels, kernel_size=1, stride=1),
nn.LeakyReLU(0.3),
)
def forward(self, input):
output = self.conv_layer_1(input)
output = self.conv_layer_2(output)
output = self.conv_layer_3(output)
output = self.conv_layer_4(output)
output = self.conv_layer_5(output)
#output shape: torch.Size([10, 1, 508])
return (output, output.shape)
#define Feed Forward Neural network structure
class FeedForwardNetwork(nn.Module):
def __init__(self, in_features, out_features, hidden_size):
super(FeedForwardNetwork, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.hidden_size = hidden_size
self.FC_layer_1 = nn.Sequential(
nn.Linear(self.in_features, self.hidden_size*4),
nn.BatchNorm1d(self.hidden_size*4),
nn.LeakyReLU(0.3),
)
self.FC_layer_2 = nn.Sequential(
nn.Linear(self.hidden_size*4, self.hidden_size*8),
nn.LeakyReLU(0.3),
)
self.FC_layer_3 = nn.Sequential(
nn.Linear(self.hidden_size*8, self.hidden_size*12),
nn.BatchNorm1d(self.hidden_size*12),
nn.LeakyReLU(0.3),
)
self.FC_layer_4 = nn.Sequential(
nn.Linear(self.hidden_size*12, self.out_features),
)
def forward(self, X):
output = self.FC_layer_1(X)
output = self.FC_layer_2(output)
output = self.FC_layer_3(output)
output = self.FC_layer_4(output)
return output
#Networks combine
class ModelNetwork(nn.Module):
def __init__(self, in_channels, out_channels,
out_features, hidden_channels,
hidden_features, dropout_rate):
super(ModelNetwork, self).__init__()
self.in_channels = in_channels
self.out_channels = out_channels
self.out_features = out_features
self.hidden_channels = hidden_channels
self.hidden_features = hidden_features
self.dropout_rate = dropout_rate
self.ff_network_in_shape = None
self.conv_network = ConvolutionalNework(self.in_channels, self.out_channels, self.hidden_channels)
self.__get_conv_shape()
self.ff_network = FeedForwardNetwork(self.ff_network_in_shape, self.out_features, self.hidden_features)
self.dropout_layer = nn.Dropout(self.dropout_rate)
def __get_conv_shape(self):
rand_sample_data = torch.randn(1, 3, 2, 2049)
_, shape = self.conv_network(rand_sample_data)
if(self.ff_network_in_shape == None):
self.ff_network_in_shape = shape[1]*shape[2]*shape[3]
def forward(self, input):
output, _ = self.conv_network(input)
output = self.dropout_layer(output)
output = output.reshape(-1, self.ff_network_in_shape)
output = self.ff_network(output)
output = self.dropout_layer(output)
return output
hyper parameters:
### set device for tensor computing
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#hyper_parameters
in_channels = 3
out_channels = 1
out_features = 2
hidden_channels_size = 6
hidden_feature_size = 200
dropout_rate = 0.2
EPOCHS = 100
batch_size = 600
learning_rate = 1e-3
#model utils
NNModel = ModelNetwork(
in_channels, out_channels, out_features,
hidden_channels_size, hidden_feature_size, dropout_rate
).to(device)
lossFunc = nn.CrossEntropyLoss()
optimizer = optim.Adam(NNModel.parameters(), lr = learning_rate)
#learning rate scheduler
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, verbose=True)
My training and testing phase is like so:
The total data sample for training is 560,000, however I’ve used only 5000 samples for training and 500 for testing. The targets numbers are evenly distributed for both real and fake
#model processes class
class ModelProcess():
def __init__(self, data_frame, train_size=0.98):
self.df = data_frame
self.train_size = train_size
self.df_dictionaries = self.__data_splice()
self.train_df, self.test_df = self.df_dictionaries['train_df'].head(5000), self.df_dictionaries['test_df'].head(500)
#scale by multiplying input vetcors by 10e+19
self.scaler = lambda vector:vector*10e+19
self.train_losses = list()
self.test_loss = None
self.saved_model_path = f'model_state-[{today.strftime("%b-%d-%Y")}].pth.tar'
#load data in batches specified
def __data_loader(self, df_to_be_loaded, start, stop):
df = df_to_be_loaded.iloc[start:stop]
df = df.sample(frac=1, random_state=40)
X_paths, y = df['data_path'].to_array(), df['data_targets'].to_array()
X = list()
for i in X_paths:
X.append(np.load(i))
X, y = np.array(X).get(), y.reshape(-1)
feature_extractor = FeatureExtractor(X)
X = feature_extractor.extract_features()
X, y = self.__vector_dtype_converter(X, y)
return self.scaler(X), y
#convert from numpy to tensor and vise versa
def __vector_dtype_converter(self, *data, to='tensor'):
if(to == 'tensor'):
tensor = tuple(torch.Tensor(i) for i in data)
return tensor
elif(to == 'numpy'):
numpy_array = tuple(np.array(i.detach()) for i in data)
return numpy_array
#train + test model
def run(self, epochs, train_batch_size):
NNModel.train()
correct_predictions, total_targets = 0, 0
for epoch in range(epochs):
print(f'epoch: {epoch}')
batch_losses = list()
batch_accuracies = list()
for idx in tqdm(range(0, len(self.train_df), train_batch_size)):
X, y = self.__data_loader(self.train_df, start=idx, stop=idx+train_batch_size)
NNModel.zero_grad()
pred = NNModel(X.to(device))
loss = lossFunc(pred, y.long().to(device))
batch_losses.append(loss.item())
loss.backward()
optimizer.step()
_, pred = torch.max(pred, 1)
correct_predictions += (pred.to('cpu') == y).sum().item()
total_targets += len(y)
batch_accuracy = (correct_predictions/total_targets)*100
batch_accuracies.append(batch_accuracy)
lr_scheduler.step()
mean_batch_loss = np.mean(np.array(batch_losses))
mean_batch_accuracies = np.mean(np.array(batch_accuracies))
self.train_losses.append(mean_batch_loss)
print(f'mean_batch_error: {mean_batch_loss} \n mean_batch_accuracy: {mean_batch_accuracies}%')
correct_predictions, total_targets = 0, 0
if(epoch%10 == 0):
self.__test(train_batch_size)
self.__model_save()
print('model saved successfully...')
#test model
def __test(self, test_batch_size):
NNModel.eval()
test_losses = list()
correct_predictions, total_targets = 0, 0
print('testing...')
with torch.no_grad():
for idx in tqdm(range(0, len(self.test_df), batch_size)):
X, y = self.__data_loader(self.test_df, start=idx, stop=idx+test_batch_size)
pred = NNModel(X.to(device))
loss = lossFunc(pred, y.long().to(device))
test_losses.append(loss.item())
_, pred = torch.max(pred, 1)
correct_predictions += (pred.to('cpu') == y).sum().item()
total_targets += len(y)
test_accuracy = (correct_predictions/total_targets)*100
print(f'test_error:{np.mean(np.array(test_losses))} test_accuracy: {test_accuracy}%')
#save model
def __model_save(self):
model_state = dict({
'model_state':NNModel.state_dict(),
'optimizer_state':optimizer.state_dict(),
})
torch.save(model_state, self.saved_model_path)
# splice data into training and testing set
def __data_splice(self):
data_size = len(self.df)
train_size = int(self.train_size*data_size)
train_df, test_df = self.df.iloc[:train_size+1], self.df.iloc[train_size+1:]
return {'train_df':train_df, 'test_df':test_df}
model_process = ModelProcess(data_frame=data_idx_df)
model_process.run(EPOCHS, batch_size)```