Hi everyone,
I was trying implement transformer regression model and I facing tensor size issue with embed_dim, my input_dim is 107 and embed_dim is 108, i adjusted to ensure it can be divided with num_heads
below is the error
File "C:\Users\ng_mi\Nus\Portfolio-Optimization\code\transformer.py", line 22, in forward
x = x + self.position_embedding(position_ids)
~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RuntimeError: The size of tensor a (107) must match the size of tensor b (108) at non-singleton dimension 2
I know usually the input_dim and the embed_dim need to match and the embed_dim need can be divided with the num_heads. I not sure what to do to satisfy both conditions
Below is part of the code and here my repo link ngminteck/Portfolio-Optimization: Portfolio Optimization (github.com)
import torch
import torch.nn as nn
from torch.utils.data import Subset
class TransformerEncoder(nn.Module):
def __init__(self, input_dim, embed_dim, num_heads, num_layers, dropout=0.1):
super(TransformerEncoder, self).__init__()
self.position_embedding = nn.Embedding(input_dim, embed_dim)
self.transformer_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=num_heads, dropout=dropout
),
num_layers=num_layers,
)
def forward(self, x):
# Add positional encoding
x = x + self.position_embedding(torch.arange(x.size(1)).unsqueeze(0).to(x.device))
# Pass through transformer encoder
x = self.transformer_encoder(x)
return x
class TransformerDecoder(nn.Module):
def __init__(self, embed_dim, output_dim, num_heads, num_layers, dropout=0.1):
super(TransformerDecoder, self).__init__()
self.transformer_decoder = nn.TransformerDecoder(
nn.TransformerDecoderLayer(
d_model=embed_dim, nhead=num_heads, dropout=dropout
),
num_layers=num_layers,
)
self.fc = nn.Linear(embed_dim, output_dim)
def forward(self, x, encoder_output):
# Pass through transformer decoder
x = self.transformer_decoder(x, encoder_output)
# Linear layer for output
x = self.fc(x)
return x
class TransformerModel(nn.Module):
def __init__(self, input_dim, embed_dim, num_heads, num_layers, output_dim, dropout=0.1, is_classification=False):
super(TransformerModel, self).__init__()
self.encoder = TransformerEncoder(input_dim, embed_dim, num_heads, num_layers, dropout)
self.is_classification = is_classification
if is_classification:
self.decoder = TransformerDecoder(embed_dim, output_dim, num_heads, num_layers, dropout)
def forward(self, x):
print(f'Type of x: {type(x)}')
if isinstance(x, Subset):
x = torch.stack([x[i] for i in range(len(x))])
x = self.encoder(x)
if self.is_classification:
x = self.decoder(x, x)
x = x[:, -1, :] # Take the last token's output for classification
return x
import optuna
import json
import torch.optim as optim
from torch.utils.data import DataLoader
import shutil
from directory_manager import *
from optuna_config import *
from transformer import *
from sequence_length import *
Model_Type = "transformer_regression"
def transformer_regression_hyperparameters_search(X, y, gpu_available, ticker_symbol):
device = torch.device('cuda' if gpu_available and torch.cuda.is_available() else 'cpu')
# Convert DataFrame to tensors
X_tensor = torch.tensor(X.values, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y.values, dtype=torch.float32).unsqueeze(1).to(device)
# Split data into training and validation sets
train_size = int(0.8 * len(X_tensor))
val_size = len(X_tensor) - train_size
input_train, input_val = torch.utils.data.random_split(X_tensor, [train_size, val_size])
target_train, target_val = torch.utils.data.random_split(y_tensor, [train_size, val_size])
def transformer_regression_objective(trial):
num_heads = trial.suggest_int('num_heads', 2, 8)
num_layers = trial.suggest_int('num_layers', 2, 6)
dropout = trial.suggest_float('dropout_rate', 0.1, 0.5)
lr = trial.suggest_float('lr', 1e-5, 1e-1)
epochs = 1000
patience = 10
input_dim = X_tensor.shape[1]
embed_dim = ((input_dim + num_heads - 1) // num_heads) * num_heads
model = TransformerModel(input_dim=input_dim, embed_dim=embed_dim, num_heads=num_heads, num_layers=num_layers, output_dim=1, dropout=dropout, is_classification=False).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
best_val_rmse = np.inf
epochs_no_improve = 0
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
output = model(input_train)
loss = criterion(output, target_train)
loss.backward()
optimizer.step()
model.eval()
with torch.no_grad():
val_output = model(input_val)
val_rmse = torch.sqrt(criterion(val_output, target_val)).item()
# Report intermediate objective value
trial.report(val_rmse, epoch)
# Prune unpromising trials
if trial.should_prune():
raise optuna.TrialPruned()
if val_rmse < best_val_rmse:
best_val_rmse = val_rmse
epochs_no_improve = 0
else:
epochs_no_improve += 1
if epochs_no_improve >= patience:
break
return best_val_rmse
study = optuna.create_study(direction='minimize', pruner=optuna.pruners.MedianPruner())
study.optimize(transformer_regression_objective, n_trials=MAX_TRIALS)