I decided to implement a random sampler by myself. As I am testing my own sampling strategy against random sampling and other sampling mechanisms. I tried to implement by random sampling strategy based on RandomSampler method. But the results are vastly different I am getting a test accuracy of 15%. But the expected test accuracy is 95%. I can’t figure out what I am doing wrong.
import torch
print("torch.__version__", torch.__version__)
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
# Create a Random Sampler
class sampler():
def __init__(self, dataset, batch_size):
self.dataset = dataset
self.batch_size = batch_size
def __iter__(self):
indices = torch.randperm (len (self.dataset))
for i in range (0, self.dataset.shape[0] // self.batch_size):
yield i, indices[i * self.batch_size: (i + 1) * self.batch_size]
# Create the Network
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
"""
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
m.bias.data.fill_(0.01)
"""
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
feat_x = x
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1), feat_x
def train(model, device, sampler, optimizer, epoch):
model.train()
for batch_idx, idx in sampler:
data, target = org_data[idx], org_target[idx]
data = data.to(device)
target = target.to(device)
optimizer.zero_grad()
output, feat = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 1000 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
def train_normal(model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data = data.to(device)
target = target.to(device)
optimizer.zero_grad()
output, feat = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % 1000 == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
def test(model, device, test_loader, epoch):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data = data.to(device)
target = target.to(device)
output, _ = model(data)
test_loss += F.nll_loss (output, target, size_average=False).item () # sum up batch loss
pred = output.max (1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq (target.view_as (pred)).sum ().item ()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
def main(normal):
# Read Dataset
dataset = datasets.MNIST("./mnist", train=True, download=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
org_data, org_target = dataset.train_data, dataset.train_labels
org_data = org_data.unsqueeze(1).type(torch.FloatTensor)
test_dataset = datasets.MNIST("./mnist", train=True, download=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
train_loader = torch.utils.data.DataLoader(dataset, batch_size=1000, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1000, shuffle=False)
# Training loop
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
train_batch_size = 64
test_batch_size = 1000
learning_rate = 0.01
momentum = 0.5
epochs = 10
sampling_fnc = sampler(org_data, train_batch_size)
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
for i in range(epochs):
if normal==1:
train_normal(model, device, train_loader, optimizer, i)
else:
train(model, device, sampling_fnc, optimizer, i)
test(model, device, test_loader, i)
if __name__ == "__main__":
main(0)
main(1)