# Transformer learns off-by-one categorization occasionally (now with code)

Hi all,

I have a strange problem with my transformer model that I am trying to train in PyTorch for self-educational purposes. I realize transformer models are a poor choice for time series, but nevertheless I downloaded daily S&P 500 closing price data from Yahoo finance for 10 years back. Then I took ratios of adjacent prices and categorized the ratios into 20 bins according to an exponential function so that precision is higher at smaller price movement. I feed the model a window of the past, say 5, ratio bins and expect it to predict the next ratio bin. Problem is, the model seems to train OK about 75% of the time but in the remaining significant proportion of runs it trains off by one. That is, the categories that are output by the trained model are literally off by one versus when it trains “correctly” (or rather “optimally”). Sometimes it even goes off by two, but that is rarer.

At my last post I elided the code, but here it is now: Hopefully someone can spot what might be going wrong!

``````import math
import time
import random
import sys
import csv
import torch
import numpy as np
from torch import nn
import matplotlib.pyplot as plt

mags_lower = [ (1.0-(((100.0**(x/10))-1)/(100-1))/2) for x in range(1, 11) ]; # 0.9...0.5
mags_upper = [ ((((100.0**(x/10))-1)/(100-1))+1.0) for x in range(1, 11) ]; # 1.001...2.0

using_embedding = True;
batching = 30;
window = 5;
fudge = 9; # to make multiple of 10
vocab_size = ((len(mags_lower) + len(mags_upper) + 1) + fudge);
passes = 1000;
learning_rate = 1e-5;
features = 10;
layers = 2;

def get_emb(sin_inp):
emb = torch.stack((sin_inp.sin(), sin_inp.cos()), dim=-1)
return torch.flatten(emb, -2, -1)

class PositionalEncoding1D(nn.Module):
def __init__(self, channels):
super(PositionalEncoding1D, self).__init__()
self.org_channels = channels
channels = int(np.ceil(channels / 2) * 2)
self.channels = channels
inv_freq = 1.0 / (10000 ** (torch.arange(0, channels, 2).float() / channels))
self.register_buffer("inv_freq", inv_freq)
self.register_buffer("cached_penc", None, persistent=False)
def forward(self, tensor):
if len(tensor.shape) != 3:
raise RuntimeError("The input tensor has to be 3d!")
if self.cached_penc is not None and self.cached_penc.shape == tensor.shape:
return self.cached_penc
self.cached_penc = None
batch_size, x, orig_ch = tensor.shape
pos_x = torch.arange(x, device=tensor.device, dtype=self.inv_freq.dtype)
sin_inp_x = torch.einsum("i,j->ij", pos_x, self.inv_freq)
emb_x = get_emb(sin_inp_x)
emb = torch.zeros((x, self.channels), device=tensor.device, dtype=tensor.dtype)
emb[:, : self.channels] = emb_x
self.cached_penc = emb[None, :, :orig_ch].repeat(batch_size, 1, 1)
return self.cached_penc

class ClassificationHead(nn.Module):
def __init__(self, d_model, seq_len=window, n_classes=vocab_size):
super(ClassificationHead, self).__init__()
self.norm = nn.LayerNorm(d_model)
self.linear = nn.Linear((d_model * seq_len), n_classes);
self.seq = nn.Sequential(nn.Flatten(),
nn.ReLU(),
nn.Dropout(0.2),
self.linear,
)
with torch.no_grad():
self.linear.weight.copy_(torch.randn(n_classes, (d_model * seq_len)));
def forward(self,x):
x = self.norm(x)
x = self.seq(x)
return x

class EncoderDecoder(nn.Module):
def __init__(self, d_model, num_layers, dropout=0.2):
super(EncoderDecoder, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
if using_embedding:
self.input_embedding = torch.nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)
else:
self.input_embedding = nn.Linear(
in_features=vocab_size,
out_features=d_model
)
self.pos_encoder = PositionalEncoding1D(d_model)
self.encoder_layer = nn.TransformerEncoderLayer(d_model, nhead=10, dropout=dropout, batch_first=True)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder = ClassificationHead(d_model=d_model)
def forward(self, x):
if self.src_mask is None:
device = torch.device("cpu")
mask = self._generate_square_subsequent_mask(len(x[0])).to(device)
self.src_mask = mask
x = self.input_embedding(x)
x = self.pos_encoder(x)
x = self.transformer_encoder(x, self.src_mask)
x = self.decoder(x)
return x
def _generate_square_subsequent_mask(self, sz):
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask

criterion = nn.CrossEntropyLoss()

def train3(model, training_by_class, batching, passes):
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
model.train()
losses = []
for i in range(passes):
data = [];
target = [];
for clazz in range(0, vocab_size):
if (len(training_by_class[clazz]) > 0):
frame = training_by_class[clazz][random.randrange(0, len(training_by_class[clazz]))];
data.append(torch.tensor(frame.past))
target.append(frame.pred)
if (len(data) != batching):
print(str(len(data)) + " != " + str(batching))
sys.exit(1)
data = torch.stack(data)
target = torch.stack([torch.tensor(t) for t in target])
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
print("pass: " + str(i+1) + "/" + str(passes) + " loss: " + str(loss.item()))
losses.append(loss.item())
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
optimizer.step()
plt.plot(losses)
plt.savefig("./local/losses.png")

def validate4(model, validation):
good = 0;
too_hi = [0] * vocab_size
too_lo = [0] * vocab_size
model.eval()
with torch.no_grad():
for frame in validation:
data = None
if (using_embedding):
data = torch.unsqueeze(torch.tensor(frame.past), 0)
else:
data = torch.unsqueeze(torch.nn.functional.one_hot(torch.tensor(frame.past), num_classes=vocab_size).float(), 0)
output = model(data)[0].argmax().item()
if (output > frame.pred):
too_hi[output - frame.pred] += 1;
elif (output < frame.pred):
too_lo[frame.pred - output] += 1;
else:
good += 1
return (good, len(validation), too_hi, too_lo)

def to_mag(r):
if (r < 1):
retv = (len(mags_lower) - 1);
for i in range(len(mags_lower)):
if (r >= mags_lower[i]):
retv = i;
break;
retv = (len(mags_lower) - retv - 1);
else:
retv = (len(mags_upper) - 1);
for i in range(len(mags_upper)):
if (r <= mags_upper[i]):
retv = i;
break;
retv = (len(mags_lower) + retv);
return retv;

class DataFrame():
def __init__(self, i_past, i_pred):
self.past = [];
for i in range(1, len(i_past)):
self.past.append(to_mag(i_past[i] / i_past[i-1]));
self.pred = to_mag(i_pred / i_past[len(i_past)-1]);

def main():
s_open = []
s_high = []
s_low = []
s_close = []
with open('./local/yahoo.csv') as csvfile:
cr = csv.reader(csvfile)
first = True
for row in cr:
if first:
first = False
continue
x_date, x_open, x_high, x_low, x_close, x_ignored_2, x_ignored_3 = row
s_open.append(float(x_open))
s_high.append(float(x_high))
s_low.append(float(x_low))
s_close.append(float(x_close))
dims = window+1;
cutoff = ((len(s_close) * 7) // 8);
training_by_class = ([[]] * vocab_size);
for i in range(0, (cutoff - dims)):
df = DataFrame(s_close[i:i+dims], s_close[i+dims]);
training_by_class[df.pred].append(df);
validation = [];
for i in range((cutoff - dims), (len(s_close) - dims)):
validation.append(DataFrame(s_close[i:i+dims], s_close[i+dims]));
model = EncoderDecoder(features, layers);
train3(model, training_by_class, batching, passes);
res = validate4(model, validation)
print(res)
if (res[0] != 102):
sys.exit(1);

main()

"""
(102, 315, [0, 57, 49, 27, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 70, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
(57, 315, [0, 49, 27, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 102, 70, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
"""
``````

I think the problem was a too low value for `passes`. When I increase `passes` tenfold, the off-by-one behavior exhibits much less frequently (but of course everything takes longer).

Sorry for the spam!