Building cov1d model with residual block + hypermeter tunner

Hi, I’m working on a stock forecasting project and trying to build a Conv1D model using residual blocks. I’m using Optuna to find the best hyperparameters and save the best model. However, I’m facing a shape/channel error when running the trial.

Some printout output
Using device: cuda
process_conv1d - Input shape after reshaping: (163, 1, 100)
process_conv1d - Training data shape: (130, 1, 100), Validation data shape: (33, 1, 100)
conv1d_objective - Model initialized with in_channels=1, out_channels=105, kernel_size=4, num_blocks=6
Conv1DModel - Input shape: torch.Size([130, 1, 100])
ResidualBlock - Input shape: torch.Size([130, 1, 100])
ResidualBlock - After conv1: torch.Size([130, 105, 100])
ResidualBlock - After bn1: torch.Size([130, 105, 100])
ResidualBlock - After relu1: torch.Size([130, 105, 100])
ResidualBlock - After conv2: torch.Size([130, 105, 100])
ResidualBlock - After bn2: torch.Size([130, 105, 100])
ResidualBlock - After adding residual: torch.Size([130, 105, 100])
ResidualBlock - After relu2: torch.Size([130, 105, 100])

Error Message
RuntimeError: Given groups=1, weight of size [105, 1, 1], expected input[130, 105, 100] to have 1 channels, but got 105 channels instead
[W 2024-09-04 12:14:32,605] Trial 0 failed with value None.

Some part of code

import os
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import optuna
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, root_mean_squared_error , mean_squared_error



def preprocess_data(df):
    if df.isna().sum().sum() > 0 or df.isin([float('inf'), float('-inf')]).sum().sum() > 0:
        df = df.replace([float('inf'), float('-inf')], float('nan')).dropna()

    df = df.dropna()

    columns_to_drop = [
        'NEXT_DAY_CLOSEPRICE', 'DAILY_CLOSEPRICE_CHANGE', 'CLOSEPRICE_DIRECTION',
        'DAILY_MIDPRICE', 'NEXT_DAY_MIDPRICE', 'DAILY_MIDPRICE_CHANGE', 'MIDPRICE_DIRECTION', 'Date'
    ]
    X = df.drop(columns=columns_to_drop)
    y_classifier = (df['DAILY_CLOSEPRICE_CHANGE'] > 0).astype(int)
    y_regressor = df['DAILY_CLOSEPRICE_CHANGE']

    return X, y_classifier, y_regressor

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, l2_lambda=0.01):
        super(ResidualBlock, self).__init__()
        
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride=1, padding='same')
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride=1, padding='same')
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        
        nn.init.kaiming_normal_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_normal_(self.conv2.weight, nonlinearity='relu')
        nn.init.zeros_(self.conv1.bias)
        nn.init.zeros_(self.conv2.bias)
        
        self.l2_lambda = l2_lambda

        if in_channels != out_channels:
            self.residual_conv = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1)
        else:
            self.residual_conv = nn.Identity()

    def forward(self, x):
        residual = self.residual_conv(x)
        print(f'ResidualBlock - Input shape: {x.shape}')
        out = self.conv1(x)
        print(f'ResidualBlock - After conv1: {out.shape}')
        out = self.bn1(out)
        print(f'ResidualBlock - After bn1: {out.shape}')
        out = self.relu(out)
        print(f'ResidualBlock - After relu1: {out.shape}')
        out = self.conv2(out)
        print(f'ResidualBlock - After conv2: {out.shape}')
        out = self.bn2(out)
        print(f'ResidualBlock - After bn2: {out.shape}')
        out += residual
        print(f'ResidualBlock - After adding residual: {out.shape}')
        out = self.relu(out)
        print(f'ResidualBlock - After relu2: {out.shape}')
        
        return out

class Conv1DModel(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, num_blocks=1, l2_lambda=0.01, classification=True):
        super(Conv1DModel, self).__init__()
        self.blocks = nn.Sequential(
            *[ResidualBlock(in_channels, out_channels, kernel_size, l2_lambda=l2_lambda) for _ in range(num_blocks)]
        )
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)  # Global average pooling for 1D
        self.fc = nn.Linear(out_channels, 2 if classification else 1)
        self.classification = classification

    def forward(self, x):
        print(f'Conv1DModel - Input shape: {x.shape}')
        out = self.blocks(x)
        print(f'Conv1DModel - After residual blocks: {out.shape}')
        out = self.global_avg_pool(out)
        print(f'Conv1DModel - After global average pooling: {out.shape}')
        out = out.view(out.size(0), -1)  # Flatten the tensor
        print(f'Conv1DModel - After flattening: {out.shape}')
        out = self.fc(out)
        print(f'Conv1DModel - After fully connected layer: {out.shape}')
        if self.classification:
            out = F.log_softmax(out, dim=1)
            print(f'Conv1DModel - After log_softmax: {out.shape}')
        return out

ef process_conv1d(X, y, classification, gpu_available, ticker):
    device = torch.device('cuda' if gpu_available and torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')
    
    # Convert DataFrame to numpy array
    X = X.to_numpy()
    y = y.to_numpy()
    
    # Reshape X for Conv1D
    NUM_CHANNELS = 1
    X = X.reshape((X.shape[0], NUM_CHANNELS, -1))  # Reshape for Conv1D: (batch_size, num_channels, sequence_length)
    print(f'process_conv1d - Input shape after reshaping: {X.shape}')
    
    # Split data into training and validation sets
    TEST_SIZE = 0.2
    RANDOM_STATE = 42
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)
    print(f'process_conv1d - Training data shape: {X_train.shape}, Validation data shape: {X_val.shape}')
    
    def conv1d_objective(trial):
        in_channels = X_train.shape[1]  # Ensure this matches the reshaped input
        out_channels = trial.suggest_int('out_channels', 16, 128)
        kernel_size = trial.suggest_int('kernel_size', 3, 7)
        num_blocks = trial.suggest_int('num_blocks', 1, 10)
        l2_lambda = trial.suggest_float('l2_lambda', 1e-5, 1e-2)
        
        model = Conv1DModel(in_channels, out_channels, kernel_size, num_blocks, l2_lambda, classification).to(device)
        print(f'conv1d_objective - Model initialized with in_channels={in_channels}, out_channels={out_channels}, kernel_size={kernel_size}, num_blocks={num_blocks}')
        
        optimizer = optim.Adam(model.parameters(), lr=trial.suggest_float('lr', 1e-5, 1e-2), weight_decay=l2_lambda)
        criterion = nn.CrossEntropyLoss() if classification else nn.MSELoss()
        
        input_train = torch.tensor(X_train, dtype=torch.float32).to(device)
        target_train = torch.tensor(y_train, dtype=torch.long if classification else torch.float32).to(device)
        
        model.train()
        EPOCHS = 10
        for epoch in range(EPOCHS):
            optimizer.zero_grad()
            output = model(input_train)
            print(f'conv1d_objective - Epoch {epoch}: Output shape: {output.shape}')
            loss = criterion(output, target_train)
            loss.backward()
            optimizer.step()
        
        # Validation
        model.eval()
        input_val = torch.tensor(X_val, dtype=torch.float32).to(device)
        target_val = torch.tensor(y_val, dtype=torch.long if classification else torch.float32).to(device)
        with torch.no_grad():
            val_output = model(input_val)
            val_loss = criterion(val_output, target_val)
            if classification:
                val_accuracy = (val_output.argmax(dim=1) == target_val).float().mean().item()
                print(f'conv1d_objective - Trial {trial.number}: Validation Accuracy = {val_accuracy:.4f}, Validation Loss = {val_loss.item():.4f}')
                return 1 - val_accuracy
            else:
                val_mse = val_loss.item()
                print(f'conv1d_objective - Trial {trial.number}: Validation MSE = {val_mse:.4f}, Validation Loss = {val_loss.item():.4f}')
                return val_mse
    
    study = optuna.create_study(direction='minimize')
    study.optimize(conv1d_objective, n_trials=100)
    
    best_model = Conv1DModel(X.shape[1], study.best_params['out_channels'], study.best_params['kernel_size'], study.best_params['num_blocks'], study.best_params['l2_lambda'], classification).to(device)
    
    # Save the best model
    model_type = 'classification' if classification else 'regression'
    torch.save(best_model.state_dict(), f'../models/pytorch/conv1d-{model_type}/{ticker}.pth')

for ticker in ticker_list:
    dataframe = pd.read_csv(f"../data/{ticker}.csv")
    X, y_classifier, y_regressor = preprocess_data(dataframe)
    process_conv1d(X, y_classifier, True, gpu_available, ticker)


This definition:

        self.blocks = nn.Sequential(
            *[ResidualBlock(in_channels, out_channels, kernel_size, l2_lambda=l2_lambda) for _ in range(num_blocks)]
        )

won’t work for multiple ResidualBlocks since each of these will expect an input with in_channels channels while the previous one will return an activation with out_channels.
You could fix it e.g. via:

        self.blocks = nn.Sequential(
            ResidualBlock(in_channels, out_channels, kernel_size, l2_lambda=l2_lambda),
            *[ResidualBlock(out_channels, out_channels, kernel_size, l2_lambda=l2_lambda) for _ in range(num_blocks-1)]
        )

but would need to double check if the “internal” ResidualBlocks should use out_channels for their input and output.

1 Like