My code is in directory look like this
├── src
│ └── main.py
| └── dataset.py
| └── model.py
| └──__ init__.py
train.py
In train.py
import torch
import torchvision
import numpy as np
import tensorboard
from src.main import MNIST
import argparse
parser = argparse.ArgumentParser(description="Trainning Siamese Network")
parser.add_argument('--batch_size',default=16)
parser.add_argument('--nepochs',default=64)
parser.add_argument('--ckpt_path',default='./ckpt/model.pth')
args = parser.parse_args()
def main():
model = MNIST(args=args)
model.train()
if __name__ == "__main__":
main()
In dataset.py
import torch
import numpy as np
import random
class Dataset(torch.utils.data.Dataset):
def __init__ (self, data, transform):
self.data = data
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
idx_1 = np.random.randint(len(self.data), size=1)[0]
img_1 = self.data[idx_1]
label_1 = img_1[1]
match = random.choice([True,False])
idx_2 = np.random.randint(len(self.data), size=1)[0]
img_2 = self.data[idx_2]
label_2 = img_2[1]
while match and (label_1 != label_2):
idx_2 = np.random.randint(len(self.data), size=1)[0]
img_2 = self.data[idx_2]
label_2 = img_2[1]
while not match and (label_1 == label_2):
idx_2 = np.random.randint(len(self.data), size=1)[0]
img_2 = self.data[idx_2]
label_2 = img_2[1]
x1 = self.transform(img_1[0])
x2 = self.transform(img_2[0])
if match:
y = torch.ones(1)
else :
y = torch.zeros(1)
return x1, x2, y
In model.py
import torch
import torch.nn as nn
from torch.nn import Conv2d , MaxPool2d, BatchNorm2d
class Siamese(nn.Module):
def __init__ (self):
super(Siamese,self).__init__()
self.encoder = nn.Sequential(
Conv2d_block(1,32, kernel_size=(3,3),stride=1,padding=1),
Conv2d_block(32,64, kernel_size=(3,3),stride=1,padding=0),
MaxPool2d(3,(2,2)),
Conv2d_block(64,128, kernel_size=(3,3),stride=1,padding=1),
Conv2d_block(128,258, kernel_size=(3,3),stride=1,padding=0),
MaxPool2d(3,(2,2)),
Conv2d_block(258,512, kernel_size=(3,3),stride=1,padding=1),
MaxPool2d(3,(2,2)),
)
def forward(self, img1 , img2):
e1 = self.encoder(img1)
e2 = self.encoder(img2)
e1 = nn.functional.normalize(e1,p=2,dim=1)
e2 = nn.functional.normalize(e2,p=2,dim=1)
return e1,e2
class Conv2d_block(nn.Module):
def __init__(self, cin , cout, kernel_size, stride, padding) :
super(Conv2d_block,self).__init__()
self.conv = Conv2d(cin,cout,kernel_size=kernel_size, stride=stride, padding=padding)
self.batchnorm = BatchNorm2d(cout)
self.act = nn.ReLU()
self.dropout = nn.Dropout2d(0.25)
def forward(self, input):
out = self.conv(input)
out = self.batchnorm(out)
out = self.act(out)
out = self.dropout(out)
return out
In main.py
import torch
import torch.nn as nn
import torchvision
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision.transforms import ToTensor
from torchvision import transforms
from src.dataset import Dataset
from src.model import Siamese
device = "cuda" if torch.cuda.is_available else "cpu"
class MNIST():
def __init__(self,args):
self.batch_size = args.batch_size
self.ckpt_path = args.ckpt_path
self.epochs = 0
self.nepochs = args.nepochs
self.transform = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
])
self.train_set = torchvision.datasets.MNIST(root= '../dataset/', train=True, download=True,)
self.train_set = Dataset(self.train_set,self.transform)
self.test_set = torchvision.datasets.MNIST(root= '../dataset/', train=False, download=True,)
self.test_set = Dataset(self.test_set,self.transform)
self.train_loader = torch.utils.data.DataLoader(dataset = self.train_set,
batch_size = self.batch_size,
shuffle=True,
num_workers=4,
)
self.test_loader = torch.utils.data.DataLoader(dataset = self.test_set,
batch_size = self.batch_size,
shuffle=True,
num_workers=4,
)
self.model = Siamese().to(device)
self.bce = nn.BCELoss()
self.optimizer = torch.optim.Adam([params for params in self.model.parameters() if params.requires_grad],lr=0.001)
def __get_loss__(self,e1,e2,y):
sim = torch.nn.functional.cosine_similarity(e1,e2)
sim = sim.squeeze(1)
loss = self.bce(sim,y)
sim = sim.detach().clone().cpu().numpy()
y = y.detach().clone().cpu().numpy()
pred = [1 if i > 0.5 else 0 for i in sim]
gt = [1 if i > 0.5 else 0 for i in y]
acc = sum([1 if y == g else 0 for y, g in zip(pred,gt)]) /len(gt)
return loss , acc
def __train_stage__(self):
running_loss = 0.
running_acc = 0.
iter_inbatch = 0
progress_bar = tqdm(self.train_loader)
self.model.train()
for x1, x2, y in progress_bar:
self.optimizer.zero_grad()
x1 = x1.to(device)
x2 = x2.to(device)
y = y.to(device)
e1 , e2 = self.model(x1,x2)
loss, acc = self.__get_loss__(e1,e2,y)
loss.backward()
self.optimizer.step()
running_loss +=loss
running_acc += acc
iter_inbatch +=1
progress_bar.set_description(" Train Epochs : {} , Loss : {} , ACC : {} ".format( self.epochs , (running_loss/iter_inbatch), (running_acc/iter_inbatch)))
self.epochs +=1
avg_loss = running_loss / iter_inbatch
avg_acc = running_acc / iter_inbatch
return avg_loss, avg_acc
def __eval_stage__(self):
running_loss = 0.
running_acc = 0.
iter_inbatch = 0
progress_bar = tqdm(self.test_loader)
with torch.no_grad():
self.model.eval()
for x1, x2, y in progress_bar:
x1 = x1.to(device)
x2 = x2.to(device)
y = y.to(device)
e1 , e2 = self.model(x1,x2)
loss, acc = self.__get_loss__(e1,e2,y)
running_loss +=loss
running_acc += acc
iter_inbatch +=1
progress_bar.set_description("Eval Epochs : {} , Loss : {:.3f} , ACC : {:.3f} "
.format( self.epochs , (running_loss/iter_inbatch), (running_acc/iter_inbatch)))
avg_loss = running_loss / iter_inbatch
avg_acc = running_acc / iter_inbatch
return avg_loss, avg_acc
def __save_model__(self, path):
torch.save(self.model.state_dict(),path)
print("save model at path {}".format(path))
def train(self):
while self.epochs < self.nepochs:
train_loss , train_acc = self.__train_stage__()
eval_loss , eval_acc = self.__eval_stage__()
self.__save_model__(path=self.ckpt_path)