Hello I have the following ConvLSTM-Autoencoder model. I have been getting the error
RuntimeError: expected scalar type Double but found Float
I think it comes from convLSTM cell, but I just couldn’t figure it out. Task is n-step ahead frame prediction. Sequence length is 3 and we want to predict the future 2 frames
Here is the code for convLSTM:
class ConvLSTMCell(nn.Module):
def __init__(self, input_dim, hidden_dim, kernel_size, bias):
"""
Initialize ConvLSTM cell.
Parameters
----------
input_dim: int
Number of channels of input tensor.
hidden_dim: int
Number of channels of hidden state.
kernel_size: (int, int)
Size of the convolutional kernel.
bias: bool
Whether or not to add the bias.
"""
super(ConvLSTMCell, self).__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.padding = kernel_size[0] // 2, kernel_size[1] // 2
self.bias = bias
self.conv = nn.Conv2d(in_channels=self.input_dim + self.hidden_dim,
out_channels=4 * self.hidden_dim,
kernel_size=self.kernel_size,
padding=self.padding,
bias=self.bias)
def forward(self, input_tensor, cur_state):
h_cur, c_cur = cur_state
combined = torch.cat([input_tensor, h_cur], dim=1) # concatenate along channel axis
combined_conv = self.conv(combined)
cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_dim, dim=1)
i = torch.sigmoid(cc_i)
f = torch.sigmoid(cc_f)
o = torch.sigmoid(cc_o)
g = torch.tanh(cc_g)
c_next = f * c_cur + i * g
h_next = o * torch.tanh(c_next)
return h_next, c_next
def init_hidden(self, batch_size, image_size):
height, width = image_size
return (torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device),
torch.zeros(batch_size, self.hidden_dim, height, width, device=self.conv.weight.device))
class ConvLSTM(nn.Module):
"""
Parameters:
input_dim: Number of channels in input
hidden_dim: Number of hidden channels
kernel_size: Size of kernel in convolutions
num_layers: Number of LSTM layers stacked on each other
batch_first: Whether or not dimension 0 is the batch or not
bias: Bias or no bias in Convolution
return_all_layers: Return the list of computations for all layers
Note: Will do same padding.
Input:
A tensor of size B, T, C, H, W or T, B, C, H, W
Output:
A tuple of two lists of length num_layers (or length 1 if return_all_layers is False).
0 - layer_output_list is the list of lists of length T of each output
1 - last_state_list is the list of last states
each element of the list is a tuple (h, c) for hidden state and memory
Example:
>> x = torch.rand((32, 10, 64, 128, 128))
>> convlstm = ConvLSTM(64, 16, 3, 1, True, True, False)
>> _, last_states = convlstm(x)
>> h = last_states[0][0] # 0 for layer index, 0 for h index
"""
def __init__(self, input_dim, hidden_dim, kernel_size, num_layers,
batch_first=False, bias=True, return_all_layers=False):
super(ConvLSTM, self).__init__()
self._check_kernel_size_consistency(kernel_size)
# Make sure that both `kernel_size` and `hidden_dim` are lists having len == num_layers
kernel_size = self._extend_for_multilayer(kernel_size, num_layers)
hidden_dim = self._extend_for_multilayer(hidden_dim, num_layers)
if not len(kernel_size) == len(hidden_dim) == num_layers:
raise ValueError('Inconsistent list length.')
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.kernel_size = kernel_size
self.num_layers = num_layers
self.batch_first = batch_first
self.bias = bias
self.return_all_layers = return_all_layers
cell_list = []
for i in range(0, self.num_layers):
cur_input_dim = self.input_dim if i == 0 else self.hidden_dim[i - 1]
cell_list.append(ConvLSTMCell(input_dim=cur_input_dim,
hidden_dim=self.hidden_dim[i],
kernel_size=self.kernel_size[i],
bias=self.bias))
self.cell_list = nn.ModuleList(cell_list)
def forward(self, input_tensor, hidden_state=None):
"""
Parameters
----------
input_tensor: todo
5-D Tensor either of shape (t, b, c, h, w) or (b, t, c, h, w)
hidden_state: todo
None. todo implement stateful
Returns
-------
last_state_list, layer_output
"""
if not self.batch_first:
# (t, b, c, h, w) -> (b, t, c, h, w)
input_tensor = input_tensor.permute(1, 0, 2, 3, 4)
b, _, _, h, w = input_tensor.size()
# Implement stateful ConvLSTM
if hidden_state is not None:
raise NotImplementedError()
else:
# Since the init is done in forward. Can send image size here
hidden_state = self._init_hidden(batch_size=b,
image_size=(h, w))
layer_output_list = []
last_state_list = []
seq_len = input_tensor.size(1)
cur_layer_input = input_tensor
for layer_idx in range(self.num_layers):
h, c = hidden_state[layer_idx]
output_inner = []
for t in range(seq_len):
h, c = self.cell_list[layer_idx](input_tensor=cur_layer_input[:, t, :, :, :],
cur_state=[h, c])
output_inner.append(h)
layer_output = torch.stack(output_inner, dim=1)
cur_layer_input = layer_output
layer_output_list.append(layer_output)
last_state_list.append([h, c])
if not self.return_all_layers:
layer_output_list = layer_output_list[-1:]
last_state_list = last_state_list[-1:]
return layer_output_list, last_state_list
def _init_hidden(self, batch_size, image_size):
init_states = []
for i in range(self.num_layers):
init_states.append(self.cell_list[i].init_hidden(batch_size, image_size))
return init_states
@staticmethod
def _check_kernel_size_consistency(kernel_size):
if not (isinstance(kernel_size, tuple) or
(isinstance(kernel_size, list) and all([isinstance(elem, tuple) for elem in kernel_size]))):
raise ValueError('`kernel_size` must be tuple or list of tuples')
@staticmethod
def _extend_for_multilayer(param, num_layers):
if not isinstance(param, list):
param = [param] * num_layers
return param
Code for AE:
class EncoderDecoderConvLSTM(nn.Module):
def __init__(self, nf, in_chan):
super(EncoderDecoderConvLSTM, self).__init__()
""" ARCHITECTURE
# Encoder (ConvLSTM)
# Encoder Vector (final hidden state of encoder)
# Decoder (ConvLSTM) - takes Encoder Vector as input
# Decoder (3D CNN) - produces regression predictions for our model
"""
## Input is 3 days
self.encoder_1_convlstm = ConvLSTMCell(input_dim=in_chan,
hidden_dim=nf,
kernel_size=(3, 3),
bias=True)
self.encoder_2_convlstm = ConvLSTMCell(input_dim=nf,
hidden_dim=nf,
kernel_size=(3, 3),
bias=True)
self.encoder_3_convlstm = ConvLSTMCell(input_dim=nf,
hidden_dim=nf,
kernel_size=(3, 3),
bias=True)
## Output is 2 days
self.decoder_1_convlstm = ConvLSTMCell(input_dim=nf, # nf + 1
hidden_dim=nf,
kernel_size=(3, 3),
bias=True)
self.decoder_2_convlstm = ConvLSTMCell(input_dim=nf,
hidden_dim=nf,
kernel_size=(3, 3),
bias=True)
"""
self.decoder_CNN = nn.Conv3d(in_channels=nf,
out_channels=1,
kernel_size=(1, 3, 3),
padding=(0, 1, 1))
"""
def autoencoder(self, x, seq_len, future_step, h_t, c_t, h_t2, c_t2, h_t3, c_t3, h_t4, c_t4, h_t5,c_t5):
outputs = []
# encoder
for t in range(seq_len):
h_t, c_t = self.encoder_1_convlstm(input_tensor=x[:, t, :, :],
cur_state=[h_t, c_t]) # we could concat to provide skip conn here
#print(input_tensor.size)
#print(cur_state.size)
h_t2, c_t2 = self.encoder_2_convlstm(input_tensor=h_t,
cur_state=[h_t2, c_t2]) # we could concat to provide skip conn here
h_t3, c_t3 = self.encoder_3_convlstm(input_tensor=h_t2,cur_state=[h_t3,c_t3])
# encoder_vector
encoder_vector = h_t3
# decoder
for t in range(future_step):
h_t4, c_t4 = self.decoder_1_convlstm(input_tensor=encoder_vector,
cur_state=[h_t4, c_t4]) # we could concat to provide skip conn here
h_t5, c_t5 = self.decoder_2_convlstm(input_tensor=h_t4,
cur_state=[h_t5, c_t5]) # we could concat to provide skip conn here
encoder_vector = h_t5
outputs += [h_t5] # predictions
outputs = torch.stack(outputs, 1)
outputs = outputs.permute(0, 2, 1, 3, 4)
#outputs = self.decoder_CNN(outputs)
#outputs = torch.nn.Sigmoid()(outputs)
return outputs
def forward(self, x, future_seq=0, hidden_state=None):
"""
Parameters
----------
input_tensor:
5-D Tensor of shape (b, t, c, h, w) # batch, time, channel, height, width
"""
# find size of different input dimensions
b, seq_len, _, h, w = x.size()
# initialize hidden states
h_t, c_t = self.encoder_1_convlstm.init_hidden(batch_size=b, image_size=(h, w))
h_t2, c_t2 = self.encoder_2_convlstm.init_hidden(batch_size=b, image_size=(h, w))
h_t3, c_t3 = self.encoder_2_convlstm.init_hidden(batch_size=b, image_size=(h, w))
h_t4, c_t4 = self.decoder_1_convlstm.init_hidden(batch_size=b, image_size=(h, w))
h_t5, c_t5 = self.decoder_2_convlstm.init_hidden(batch_size=b, image_size=(h, w))
# autoencoder forward
outputs = self.autoencoder(x, seq_len, future_seq, h_t, c_t, h_t2, c_t2, h_t3, c_t3, h_t4, c_t4, h_t5, c_t5)
return outputs
My model:
#---Model----#
class MyModel(pl.LightningModule):
def __init__(self, hparams=None, model=None):
super(MyModel, self).__init__()
# default config
self.normalize = False
self.model = model
# logging config
self.log_images = True
# Training config
self.criterion = torch.nn.MSELoss()
self.batch_size = 8
self.n_steps_past = 3
self.n_steps_ahead = 2 # 4
def forward(self, x):
output = self.model(x, future_seq=self.n_steps_ahead)
return output
def training_step(self, batch, batch_idx):
x, y = batch[0],batch[1]
#x = x.permute(0, 1, 4, 2, 3)
print("x shape: ",x.shape)
y = y.squeeze()
y_hat = self.forward(x).squeeze() # is squeeze neccessary?
loss = self.criterion(y_hat, y)
# save learning_rate
lr_saved = self.trainer.optimizers[0].param_groups[-1]['lr']
lr_saved = torch.scalar_tensor(lr_saved)
#.cuda()
# save predicted images every 250 global_step
"""
if self.log_images:
if self.global_step % 250 == 0:
final_image = self.create_video(x, y_hat, y)
self.logger.experiment.add_image(
'epoch_' + str(self.current_epoch) + '_step' + str(self.global_step) + '_generated_images',
final_image, 0)
plt.close()
"""
tensorboard_logs = {'train_mse_loss': loss,
'learning_rate': lr_saved}
return {'loss': loss, 'log': tensorboard_logs}
def test_step(self, batch, batch_idx):
# OPTIONAL
x, y = batch
y_hat = self.forward(x)
return {'test_loss': self.criterion(y_hat, y)}
def test_end(self, outputs):
# OPTIONAL
avg_loss = torch.stack([x['test_loss'] for x in outputs]).mean()
tensorboard_logs = {'test_loss': avg_loss}
return {'avg_test_loss': avg_loss, 'log': tensorboard_logs}
def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=0.001)
def train_dataloader(self):
#Create dataset
#Load train data
DL_train = DataLoader(train_ds)
return DL_train
def test_dataloader(self):
#Load test data
DL_test = DataLoader(test_ds)
return DL_test
def run_trainer():
conv_lstm_model = EncoderDecoderConvLSTM(nf=16, in_chan=29)
model = MyModel(model=conv_lstm_model)
#model = model.to(device='cuda')
trainer = Trainer(max_epochs=8)
trainer.fit(model)