import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import numpy as np
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Define an improved neural network class
class ImprovedNN(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(ImprovedNN, self).__init__()
layers = []
current_size = input_size
for hidden_size in hidden_sizes:
layers.append(nn.Linear(current_size, hidden_size))
layers.append(nn.LeakyReLU())
current_size = hidden_size
layers.append(nn.Linear(current_size, output_size))
self.model = nn.Sequential(*layers)
self._initialize_weights()
def forward(self, x):
return self.model(x)
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight)
nn.init.zeros_(m.bias)
# Preprocess data
def preprocess_data(x_train, y_train, x_val, y_val, scaling_method='standard'):
if scaling_method == 'standard':
scaler_x = StandardScaler()
scaler_y = StandardScaler()
elif scaling_method == 'minmax':
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()
else:
raise ValueError("Invalid scaling method. Use 'standard' or 'minmax'.")
x_train = scaler_x.fit_transform(x_train)
y_train = scaler_y.fit_transform(y_train.reshape(-1, 1))
x_val = scaler_x.transform(x_val)
y_val = scaler_y.transform(y_val.reshape(-1, 1))
return torch.tensor(x_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32), torch.tensor(x_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32), scaler_y
# Save model
def save_model(model, filepath):
torch.save(model.state_dict(), filepath)
# Load model
def load_model(model, filepath):
model.load_state_dict(torch.load(filepath))
model.eval()
# Train model
def train_model(model, train_loader, val_loader, epochs=1000, initial_lr=0.05, save_path='improved_nn.pth'):
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=initial_lr)
best_val_loss = float('inf')
for epoch in range(epochs):
model.train()
epoch_loss = 0.0
for data, targets in train_loader:
data, targets = data.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
epoch_loss /= len(train_loader)
if (epoch + 1) % 100 == 0:
model.eval()
val_loss = 0.0
with torch.no_grad():
for val_data, val_targets in val_loader:
val_data, val_targets = val_data.to(device), val_targets.to(device)
val_outputs = model(val_data)
val_loss += criterion(val_outputs, val_targets).item()
val_loss /= len(val_loader)
print(f'Epoch [{epoch+1}/{epochs}], Training Loss: {epoch_loss:.8f}, Validation Loss: {val_loss:.8f}')
if val_loss < best_val_loss:
best_val_loss = val_loss
print('Training set predictions:')
with torch.no_grad():
for data, targets in train_loader:
data, targets = data.to(device), targets.to(device)
train_predictions = model(data)
for i in range(data.size(0)):
input_str = ', '.join(f"{x:.2f}".rstrip('0').rstrip('.') for x in data[i].tolist())
target_str = f"{targets[i].item():.4f}".rstrip('0').rstrip('.')
prediction_str = f"{train_predictions[i].item():.4f}".rstrip('0').rstrip('.')
loss_str = f"{criterion(train_predictions[i], targets[i]).item():.8f}"
print(f'Input: [{input_str}], Target: {target_str}, Prediction: {prediction_str}, Loss: {loss_str}')
print('Validation set predictions:')
for val_data, val_targets in val_loader:
val_data, val_targets = val_data.to(device), val_targets.to(device)
val_predictions = model(val_data)
for i in range(val_data.size(0)):
input_str = ', '.join(f"{x:.2f}".rstrip('0').rstrip('.') for x in val_data[i].tolist())
target_str = f"{val_targets[i].item():.4f}".rstrip('0').rstrip('.')
prediction_str = f"{val_predictions[i].item():.4f}".rstrip('0').rstrip('.')
loss_str = f"{criterion(val_predictions[i], val_targets[i]).item():.8f}"
print(f'Input: [{input_str}], Target: {target_str}, Prediction: {prediction_str}, Loss: {loss_str}')
save_model(model, save_path)
load_model(model, save_path)
return model
# Inverse transform predictions
def inverse_transform(scaler, data):
return scaler.inverse_transform(data.reshape(-1, 1)).flatten()
if __name__ == "__main__":
input_size = 2
hidden_sizes = [4, 4] # Change this list to set the number of hidden layers and neurons
output_size = 1
initial_lr = 0.05
epochs = 1000
model_filepath = 'improved_nn.pth'
batch_size = 16
x_train = np.array([
[1.0, 2.0], [2.0, 3.0], [3.0, 4.0], [-5.0, 6.0], [-2.56, 6.0], [4.0, 5.0],
[10.0, 11.0], [15.5, 16.5], [-20.0, -30.0], [25.75, 26.25], [-35.5, 40.5], [45.0, 46.0],
[50.0, -60.0], [-70.0, 80.0], [90.1, 100.2], [110.3, -120.4], [-130.5, 140.6], [150.7, 160.8],
[170.9, -180.1], [190.2, 200.3], [-210.4, 220.5], [230.6, -240.7], [-250.8, 260.9], [270.1, 280.2],
[290.3, -300.4], [-310.5, 320.6], [330.7, 340.8], [350.9, -360.1], [370.2, 380.3], [-390.4, 400.5],
[410.6, -420.7], [-430.8, 440.9], [450.1, 460.2], [470.3, -480.4], [-490.5, 500.6], [510.7, 520.8],
[530.9, -540.1], [550.2, 560.3], [-570.4, 580.5], [590.6, -600.7], [-610.8, 620.9], [630.1, 640.2],
[650.3, -660.4], [-670.5, 680.6], [690.7, 700.8], [710.9, -720.1], [730.2, 740.3], [-750.4, 760.5],
[770.6, -780.7], [-790.8, 800.9], [810.1, 820.2], [830.3, -840.4], [-850.5, 860.6], [870.7, 880.8]
])
y_train = np.array([
3.0, 5.0, 7.0, 1.0, 3.44, 9.0,
21.0, 32.0, -50.0, 52.0, 5.0, 91.0,
-10.0, 10.0, 190.3, -10.1, 10.1, 311.5,
-9.2, 390.5, 10.1, -10.1, 10.1, 550.3,
-10.1, 330.2, 671.5, -10.1, 540.5, 10.1,
-10.1, 871.5, 911.5, -10.1, 110.1, 990.3,
-10.1, 220.3, 5.0, -10.1, 540.3, 1100.5,
-10.1, 331.2, 450.2, -10.1, 550.3, 620.5,
-10.1, 770.5, 810.1, -10.1, 220.3, 550.3
])
x_val = np.array([
[5.0, 6.0], [6.0, 7.0], [8.5, 9.5], [-10.5, 11.5], [-12.5, 13.5], [14.5, 15.5],
[16.5, 17.5], [-18.5, 19.5], [20.5, 21.5], [-22.5, 23.5], [24.5, 25.5], [-26.5, 27.5]
])
y_val = np.array([
11.0, 13.0, 18.0, 1.0, 1.0, 30.0,
34.0, 1.0, 42.0, 1.0, 50.0, 1.0
])
# Preprocess the data
x_train, y_train, x_val, y_val, scaler_y = preprocess_data(x_train, y_train, x_val, y_val, scaling_method='standard')
# Create data loaders
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataset = TensorDataset(x_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# Initialize the model
model = ImprovedNN(input_size, hidden_sizes, output_size).to(device)
try:
load_model(model, model_filepath)
print(f'Model loaded from {model_filepath}')
except (FileNotFoundError, RuntimeError):
print(f'No compatible saved model found at {model_filepath}. Training a new model.')
# Train the model
model = train_model(model, train_loader, val_loader, epochs=epochs, initial_lr=initial_lr, save_path=model_filepath)
# Save and load the model
save_model(model, model_filepath)
print(f'Model saved to {model_filepath}')
load_model(model, model_filepath)
print(f'Model loaded from {model_filepath}')
# Test data and predictions
test_data = torch.tensor([[10.0, 20.0], [30.0, 40.0]], dtype=torch.float32)
test_data = test_data.to(device)
predictions = model(test_data)
predictions = inverse_transform(scaler_y, predictions.cpu().detach().numpy())
predictions_formatted = [f"{p:.4f}".rstrip('0').rstrip('.') for p in predictions]
print(f'Predictions: {predictions_formatted}')
I’m using vs code.
When I added a system for preprocessing the training data, it went wrong. I don’t know how to fix it, so any help is appreciated!
Don’t know where to put this.