I am dealing with Transformers time series forecasting using Pytorch
the model everything fin but the error like below
File C:\ProgramData\anaconda3\Lib\site-packages\torch\nn\modules\activation.py:1241 in forward
attn_output, attn_output_weights = F.multi_head_attention_forward(
File C:\ProgramData\anaconda3\Lib\site-packages\torch\nn\functional.py:5440 in multi_head_attention_forward
attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)
RuntimeError: Expected attn_mask dtype to be bool or to match query dtype, but got attn_mask.dtype: float and query.dtype: double instead.
below is the whole procedure
1 - Preprocessing step
dim_val = 512
n_heads = 4
n_decoder_layers = 2
n_encoder_layers = 2
dec_seq_len = 1 # length of input given to decoder
enc_seq_len = 5 # length of input given to encoder
output_sequence_length = 1 # target sequence length. If hourly data and length = 48, you predict 2 days ahead
window_size = enc_seq_len + output_sequence_length # used to slice data into sub-sequences
step_size = 1 # Step size, i.e. how many time steps does the moving window move at each step
in_features_encoder_linear_layer = 2048
in_features_decoder_linear_layer = 2048
max_seq_len = enc_seq_len
batch_first = True
target_seq_len = 1
def is_ne_in_df(df:pd.DataFrame):
for col in df.columns:
true_bool = (df[col] == "n/e")
if any(true_bool):
return True
return False
def to_numeric_and_downcast_data(df: pd.DataFrame):
fcols = df.select_dtypes('float64').columns
icols = df.select_dtypes('integer').columns
df[fcols] = df[fcols].apply(pd.to_numeric, downcast='float64')
df[icols] = df[icols].apply(pd.to_numeric, downcast='integer')
return df
def process_missing_and_duplicate_timestamps(filepath, train_size=80, val_size=50, verbose=True):
df = pd.read_csv(filepath)
df.sort_values('Datetime', inplace=True)
df.reset_index(drop=True, inplace=True)
indices_to_remove = []
rows_to_add = []
hour_counter = 1
prev_date = ''
if verbose:
print(filepath)
for index, row in df.iterrows():
date_str = row['Datetime']
year_str = date_str[0:4]
month_str = date_str[5:7]
day_str = date_str[8:10]
hour_str = date_str[11:13]
tail_str = date_str[14:]
def date_to_str():
return '-'.join([year_str, month_str, day_str]) + ' ' + ':'.join([hour_str, tail_str])
def date_with_hour(hour):
hour = '0' + str(hour) if hour < 10 else str(hour)
return '-'.join([year_str, month_str, day_str]) + ' ' + ':'.join([hour, tail_str])
if hour_counter != int(hour_str):
if prev_date == date_to_str():
# Duplicate datetime, calculate the average and keep only one
average = int((df.iat[index, 1] + df.iat[index - 1, 1]) / 2) # Get the average
df.iat[index, 1] = average
indices_to_remove.append(index - 1)
if verbose:
print('Duplicate ' + date_to_str() + ' with average ' + str(average))
elif hour_counter < 23:
# Missing datetime, add it using the average of the previous and next for the consumption (MWs)
average = int((df.iat[index, 1] + df.iat[index - 1, 1]) / 2)
rows_to_add.append(pd.Series([date_with_hour(hour_counter), average], index=df.columns))
if verbose:
print('Missing ' + date_with_hour(hour_counter) + ' with average ' + str(average))
else:
print(date_to_str() + ' and hour_counter ' + str(hour_counter) + " with previous: " + prev_date)
# Adjust for the missing/duplicate value
if prev_date < date_to_str():
hour_counter = (hour_counter + 1) % 24
else:
hour_counter = (hour_counter - 1) if hour_counter - 1 > 0 else 0
# Increment the hour
hour_counter = (hour_counter + 1) % 24
prev_date = date_str
df.drop(indices_to_remove, inplace=True)
if rows_to_add:
new_rows = pd.concat(rows_to_add, axis=1).transpose()
df = pd.concat([df, new_rows], ignore_index=True) # Concatenating the new rows
# New rows are added at the end, sort them and also recalculate the indices
df.sort_values('Datetime', inplace=True)
df.reset_index(drop=True, inplace=True)
df = df.set_index("Datetime")
if is_ne_in_df(df):
raise ValueError("data frame contains 'n/e' values. These must be handled")
df = to_numeric_and_downcast_data(df)
train = df[:len(df) * train_size // 100]
val = df[len(train) : len(train) + ((len(df) - len(train)) * val_size) // 100]
test = df[len(val) + len(train) : ]
return df , train , val , test
windowing Technique
def get_indices_entire_sequence(data: pd.DataFrame, window_size: int, step_size: int) -> list:
stop_position = len(data)-1 # 1- because of 0 indexing
# Start the first sub-sequence at index position 0
subseq_first_idx = 0
subseq_last_idx = window_size
indices = []
while subseq_last_idx <= stop_position:
indices.append((subseq_first_idx, subseq_last_idx))
subseq_first_idx += step_size
subseq_last_idx += step_size
return indices
train_indices = get_indices_entire_sequence(data = train_set, window_size = window_size, step_size = step_size)
test_indices = get_indices_entire_sequence(data = test_set, window_size = window_size, step_size = step_size)
val_indices = get_indices_entire_sequence(data = val_set, window_size = window_size, step_size = step_size)
train_set = train_set.astype(np.float64)
test_set = test_set.astype(np.float64)
val_set = val_set.astype(np.float64)
scaler = MinMaxScaler(feature_range=(0 , 1))
train_set = scaler.fit_transform(train_set)
test_set = scaler.fit_transform(test_set)
val_set = scaler.transform(val_set)
train_set = torch.tensor(train_set)
test_set = torch.tensor(test_set)
val_set = torch.tensor(val_set)
def generate_square_subsequent_mask(dim1: int, dim2: int):
return torch.triu(torch.ones(dim1, dim2) * float('-inf'), diagonal=1)
class TransformerDataset(Dataset):
def __init__(self,
data: torch.tensor,
indices: list,
enc_seq_len: int,
dec_seq_len: int,
target_seq_len: int
) -> None:
super().__init__()
self.indices = indices
self.data = data
print("From get_src_trg: data size = {}".format(data.size()))
self.enc_seq_len = enc_seq_len
self.dec_seq_len = dec_seq_len
self.target_seq_len = target_seq_len
def __len__(self):
return len(self.indices)
def __getitem__(self, index):
# Get the first element of the i'th tuple in the list self.indicesasdfas
start_idx = self.indices[index][0]
# Get the second (and last) element of the i'th tuple in the list self.indices
end_idx = self.indices[index][1]
sequence = self.data[start_idx:end_idx]
src, trg, trg_y = self.get_src_trg(
sequence=sequence,
enc_seq_len=self.enc_seq_len,
dec_seq_len=self.dec_seq_len,
target_seq_len=self.target_seq_len
)
return src, trg, trg_y
def get_src_trg(
self,
sequence: torch.Tensor,
enc_seq_len: int,
dec_seq_len: int,
target_seq_len: int
) -> Tuple[torch.tensor, torch.tensor, torch.tensor]:
assert len(sequence) == enc_seq_len + target_seq_len, "Sequence length does not equal (input length + target length)"
src = sequence[:enc_seq_len]
trg = sequence[enc_seq_len-1:len(sequence)-1]
assert len(trg) == target_seq_len, "Length of trg does not match target sequence length"
# The target sequence against which the model output will be compared to compute loss
trg_y = sequence[-target_seq_len:]
assert len(trg_y) == target_seq_len, "Length of trg_y does not match target sequence length"
return src, trg, trg_y.squeeze(-1)
Transformers Encoder, Decoder and Positional Encoding layers
class PositionalEncoder(nn.Module):
def __init__( self, dropout: 0.1, max_seq_len: int=5000, d_model: int=512,batch_first: bool=True):
super().__init__()
self.d_model = d_model
self.dropout = nn.Dropout(p=dropout)
self.batch_first = batch_first
position = torch.arange(max_seq_len).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
if self.batch_first:
pe = torch.zeros(1, max_seq_len, d_model)
pe[0, :, 0::2] = torch.sin(position * div_term)
pe[0, :, 1::2] = torch.cos(position * div_term)
else:
pe = torch.zeros(max_seq_len, 1, d_model)
pe[:, 0, 0::2] = torch.sin(position * div_term)
pe[:, 0, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe)
def forward(self, x: Tensor) -> Tensor:
if self.batch_first:
x = x + self.pe[:,:x.size(1)]
else:
x = x + self.pe[:x.size(0)]
return self.dropout(x)
class TimeSeriesTransformer(nn.Module):
def __init__(self,
input_size: int,
dec_seq_len: int,
batch_first: bool,
out_seq_len: int=1,
dim_val: int=512,
n_encoder_layers: int=4,
n_decoder_layers: int=4,
n_heads: int=8,
dropout_encoder = 0.2,
dropout_decoder = 0.2,
dropout_pos_enc = 0.1,
dim_feedforward_encoder: int=2048,
dim_feedforward_decoder: int=2048,
num_predicted_features: int=1
):
super().__init__()
self.dec_seq_len = dec_seq_len
self.encoder_input_layer = nn.Linear(
in_features=input_size,
out_features=dim_val
)
self.decoder_input_layer = nn.Linear(
in_features=num_predicted_features,
out_features=dim_val
)
self.linear_mapping = nn.Linear(
in_features=dim_val,
out_features=num_predicted_features
)
# Create positional encoder
self.positional_encoding_layer = PositionalEncoder(
d_model=dim_val,
dropout=dropout_pos_enc
)
encoder_layer = nn.TransformerEncoderLayer(
d_model=dim_val,
nhead=n_heads,
dim_feedforward=dim_feedforward_encoder,
dropout=dropout_encoder,
batch_first=batch_first
)
self.encoder = nn.TransformerEncoder(
encoder_layer=encoder_layer,
num_layers=n_encoder_layers,
norm=None
)
decoder_layer = nn.TransformerDecoderLayer(
d_model=dim_val,
nhead=n_heads,
dim_feedforward=dim_feedforward_decoder,
dropout=dropout_decoder,
batch_first=batch_first
)
self.decoder = nn.TransformerDecoder(
decoder_layer=decoder_layer,
num_layers=n_decoder_layers,
norm=None
)
def forward(self, src: Tensor, tgt: Tensor, src_mask: Tensor=None,
tgt_mask: Tensor=None) -> Tensor:
src = self.encoder_input_layer(src)
src = self.positional_encoding_layer(src)
src = self.encoder(src=src)
decoder_output = self.decoder_input_layer(tgt)
decoder_output = self.decoder(
tgt=decoder_output,
memory=src,
tgt_mask=tgt_mask,
memory_mask=src_mask
)
decoder_output = self.linear_mapping(decoder_output)
return decoder_output
when i want to run the model
model = TimeSeriesTransformer(
input_size=1,
dec_seq_len=enc_seq_len,
batch_first=batch_first,
num_predicted_features=1
).to(torch.float64)
i, batch = next(enumerate(train_loader))
src, trg, trg_y = batch
output = model(
src=src,
tgt=trg,
src_mask=src_mask,
tgt_mask=tgt_mask
)
raised error is like below
File C:\ProgramData\anaconda3\Lib\site-packages\torch\nn\modules\activation.py:1241 in forward
attn_output, attn_output_weights = F.multi_head_attention_forward(
File C:\ProgramData\anaconda3\Lib\site-packages\torch\nn\functional.py:5440 in multi_head_attention_forward
attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)
RuntimeError: Expected attn_mask dtype to be bool or to match query dtype, but got attn_mask.dtype: float and query.dtype: double instead.