# THE MODEL
import torch
import torch.nn as nn
from einops import rearrange
def conv_1x1_bn(inp, oup):
return nn.Sequential(
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
nn.SiLU()
)
def conv_nxn_bn(inp, oup, kernal_size=3, stride=1):
return nn.Sequential(
nn.Conv2d(inp, oup, kernal_size, stride, 1, bias=False),
nn.BatchNorm2d(oup),
nn.SiLU()
)
class PreNorm(nn.Module):
def __init__(self, dim, fn):
super().__init__()
self.norm = nn.LayerNorm(dim)
self.fn = fn
def forward(self, x, **kwargs):
return self.fn(self.norm(x), **kwargs)
class FeedForward(nn.Module):
def __init__(self, dim, hidden_dim, dropout=0.):
super().__init__()
self.net = nn.Sequential(
nn.Linear(dim, hidden_dim),
nn.SiLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, dim),
nn.Dropout(dropout)
)
def forward(self, x):
return self.net(x)
class Attention(nn.Module):
def __init__(self, dim, heads=8, dim_head=64, dropout=0.):
super().__init__()
inner_dim = dim_head * heads
project_out = not (heads == 1 and dim_head == dim)
self.heads = heads
self.scale = dim_head ** -0.5
self.attend = nn.Softmax(dim = -1)
self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)
self.to_out = nn.Sequential(
nn.Linear(inner_dim, dim),
nn.Dropout(dropout)
) if project_out else nn.Identity()
def forward(self, x):
qkv = self.to_qkv(x).chunk(3, dim=-1)
q, k, v = map(lambda t: rearrange(t, 'b p n (h d) -> b p h n d', h = self.heads), qkv)
dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale
attn = self.attend(dots)
out = torch.matmul(attn, v)
out = rearrange(out, 'b p h n d -> b p n (h d)')
return self.to_out(out)
class Transformer(nn.Module):
def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout=0.):
super().__init__()
self.layers = nn.ModuleList([])
for _ in range(depth):
self.layers.append(nn.ModuleList([
PreNorm(dim, Attention(dim, heads, dim_head, dropout)),
PreNorm(dim, FeedForward(dim, mlp_dim, dropout))
]))
def forward(self, x):
for attn, ff in self.layers:
x = attn(x) + x
x = ff(x) + x
return x
class MV2Block(nn.Module):
def __init__(self, inp, oup, stride=1, expansion=4):
super().__init__()
self.stride = stride
assert stride in [1, 2]
hidden_dim = int(inp * expansion)
self.use_res_connect = self.stride == 1 and inp == oup
if expansion == 1:
self.conv = nn.Sequential(
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
nn.SiLU(),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
else:
self.conv = nn.Sequential(
# pw
nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
nn.BatchNorm2d(hidden_dim),
nn.SiLU(),
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
nn.SiLU(),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
class MobileViTBlock(nn.Module):
def __init__(self, dim, depth, channel, kernel_size, patch_size, mlp_dim, dropout=0.):
super().__init__()
self.ph, self.pw = patch_size
self.conv1 = conv_nxn_bn(channel, channel, kernel_size)
self.conv2 = conv_1x1_bn(channel, dim)
self.transformer = Transformer(dim, depth, 4, 8, mlp_dim, dropout)
self.conv3 = conv_1x1_bn(dim, channel)
self.conv4 = conv_nxn_bn(2 * channel, channel, kernel_size)
def forward(self, x):
y = x.clone()
# Local representations
x = self.conv1(x)
x = self.conv2(x)
# Global representations
_, _, h, w = x.shape
x = rearrange(x, 'b d (h ph) (w pw) -> b (ph pw) (h w) d', ph=self.ph, pw=self.pw)
x = self.transformer(x)
x = rearrange(x, 'b (ph pw) (h w) d -> b d (h ph) (w pw)', h=h//self.ph, w=w//self.pw, ph=self.ph, pw=self.pw)
# Fusion
x = self.conv3(x)
x = torch.cat((x, y), 1)
x = self.conv4(x)
return x
class MobileViT(nn.Module):
def __init__(self, image_size, dims, channels, num_classes, expansion=4, kernel_size=3, patch_size=(2, 2)):
super().__init__()
ih, iw = image_size
ph, pw = patch_size
assert ih % ph == 0 and iw % pw == 0
L = [2, 4, 3]
self.layer_raw = MV2Block(3, channels[0], 2, expansion)
self.layer_depth = MV2Block(3, channels[0], 2, expansion)
self.layer_Luv = MV2Block(3, channels[0], 2, expansion)
self.layer_YCrCb = MV2Block(3, channels[0], 2, expansion)
self.conv1 = conv_nxn_bn(32*4, channels[1], stride=2)
self.mv2 = nn.ModuleList([])
self.mv2.append(MV2Block(channels[1], channels[1], 1, expansion))
self.mv2.append(MV2Block(channels[1], channels[1], 2, expansion))
self.mv2.append(MV2Block(channels[1], channels[1], 1, expansion))
self.mv2.append(MV2Block(channels[1], channels[1], 1, expansion)) # Repeat
self.mv2.append(MV2Block(channels[1], channels[1], 2, expansion))
self.mv2.append(MV2Block(channels[1], channels[2], 2, expansion))
self.mv2.append(MV2Block(channels[2], channels[2], 2, expansion))
self.mvit = nn.ModuleList([])
self.mvit.append(MobileViTBlock(dims[0], L[0], channels[1], kernel_size, patch_size, int(dims[0]*2)))
self.mvit.append(MobileViTBlock(dims[1], L[1], channels[2], kernel_size, patch_size, int(dims[1]*4)))
self.mvit.append(MobileViTBlock(dims[2], L[2], channels[2], kernel_size, patch_size, int(dims[2]*4)))
self.conv2 = conv_1x1_bn(channels[2], channels[2])
self.pool = nn.AvgPool2d(4, 1)
self.linear = nn.Linear(256,1)
self.sigmoid =nn.Sigmoid()
def forward(self, x):
raw = self.layer_raw(x[0])
depth = self.layer_depth(x[1])
Luv = self.layer_Luv(x[2])
YCrCb = self.layer_YCrCb(x[3])
x = torch.cat((raw, depth,Luv,YCrCb), 1)
x = self.conv1(x)
x = self.mv2[0](x)
x = self.mv2[1](x)
x = self.mv2[2](x)
x = self.mv2[3](x) # Repeat
x = self.mv2[4](x)
x = self.mvit[0](x)
x = self.mv2[5](x)
x = self.mvit[1](x)
x = self.mv2[6](x)
x = self.mvit[2](x)
x = self.conv2(x)
# x = self.pool(x).view(-1, x.shape[1])
x = F.avg_pool2d(x, kernel_size=x.size()[2:]).view(x.size()[0],-1)
x = self.sigmoid(self.linear(x))
return x
```
Training/val Functions
def validate(model, loss_fn, optimizer):
model.eval()
predictions = []
with torch.no_grad():
validation_batch_losses = []
for (raw,depth,Luv,YCrCb, labels) in val_loader:
raw = raw.reshape(len(raw),3,256,256)
raw =raw/127.5 -1
raw = raw.to(device)
depth = depth.reshape(len(depth),3,256,256)
depth =depth/127.5 -1
depth = depth.to(device)
Luv = Luv.reshape(len(Luv),3,256,256)
Luv =Luv/127.5 -1
Luv = Luv.to(device)
YCrCb = YCrCb.reshape(len(YCrCb),3,256,256)
YCrCb =YCrCb/127.5 -1
YCrCb = YCrCb.to(device)
labels = labels.to(device).to(torch.float32)
images =[raw,depth,Luv,YCrCb]
labels_pred = model(images).squeeze(1)
loss = loss_fn(labels_pred, labels)
validation_batch_losses.append(float(loss))
mean_loss = statistics.mean(validation_batch_losses)
return mean_loss
def accuracy(model, loader):
correct = 0
total = 0
model.eval()
with torch.no_grad():
for (raw,depth,Luv,YCrCb, labels) in train_loader:
raw = raw.reshape(len(raw),3,256,256)
raw =raw/127.5 -1
raw = raw.to(device)
depth = depth.reshape(len(depth),3,256,256)
depth =depth/127.5 -1
depth = depth.to(device)
Luv = Luv.reshape(len(Luv),3,256,256)
Luv =Luv/127.5 -1
Luv = Luv.to(device)
YCrCb = YCrCb.reshape(len(YCrCb),3,256,256)
YCrCb =YCrCb/127.5 -1
YCrCb = YCrCb.to(device)
labels = labels.to(device)
images =[raw,depth,Luv,YCrCb]
labels_pred = model(images)
_, predicted = torch.max(labels_pred.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
return (100* correct / total)
def train(model, loss_fn, optimizer):
model.train()
train_batch_losses = []
for (raw,depth,Luv,YCrCb, labels) in train_loader:
raw = raw.reshape(len(raw),3,256,256)
raw =raw/127.5 -1
raw = raw.to(device)
depth = depth.reshape(len(depth),3,256,256)
depth =depth/127.5 -1
depth = depth.to(device)
Luv = Luv.reshape(len(Luv),3,256,256)
Luv =Luv/127.5 -1
Luv = Luv.to(device)
YCrCb = YCrCb.reshape(len(YCrCb),3,256,256)
YCrCb = YCrCb/127.5 -1
YCrCb = YCrCb.to(device)
labels = labels.to(device).to(torch.float32)
images =[raw,depth,Luv,YCrCb]
optimizer.zero_grad()
y_pred = model(images).squeeze(1)
loss = loss_fn(y_pred, labels)
loss.backward()
optimizer.step()
train_batch_losses.append(float(loss))
mean_loss = statistics.mean(train_batch_losses)
return mean_loss
```# The rest
epochs = 20
batch_size = 10
learning_rate = 0.00001
dims = [144, 192, 240]
channels = [32,128, 256]
model = MobileViT((256, 256), dims, channels, num_classes=1).to(device)
print(count_parameters(model))
# summary(model, (3, 256, 256))
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)
n_total_steps = len(train_loader)
train_losses = []
valid_losses = []
train_accuracies = []
valid_accuracies = []
for epoch in range(1, epochs + 1):
print('Epoch: ', epoch)
train_loss = train(model, criterion, optimizer)
train_accuracy = accuracy(model, train_loader)
train_losses.append(train_loss)
train_accuracies.append(train_accuracy)
print('Training loss:', train_loss)
print('Training accuracy: {}%'.format(train_accuracy))
valid_loss = validate(model, criterion, optimizer)
valid_accuracy = accuracy(model, val_loader)
valid_losses.append(valid_loss)
valid_accuracies.append(valid_accuracy)
print('Validation loss:', valid_loss)
print('Validation accuracy: {}%'.format(valid_accuracy))
OUTPUT:
Epoch: 1
Training loss: 0.7023254632949829
Training accuracy: 50.0%
Validation loss: 0.6932021677494049
Validation accuracy: 50.0%
Epoch: 2
Training loss: 0.5930162966251373
Training accuracy: 50.0%
Validation loss: 0.6931731104850769
Validation accuracy: 50.0%
Epoch: 3
Training loss: 0.5212235152721405
Training accuracy: 50.0%
Validation loss: 0.6931508183479309
Validation accuracy: 50.0%
Epoch: 4
Training loss: 0.4801531434059143
Training accuracy: 50.0%
Validation loss: 0.6931528747081757
Validation accuracy: 50.0%
I have tried different lr and different accuracy functions without seccess
Thanks