1D data Surrogate Gradient Descent in a Convolutional Spiking Neural Network

I have customized 1d binary datasets classes and then trying to apply surrogate gradient descent spike model and trying it to run the code on cuda, I am not able to implement the data for conv2d so I tried it with conv1d but getting too many errors repeatedly. Looking for Help.

imports

import multiprocessing
import csv
import os
import sys

import snntorch as snn
from snntorch import surrogate
from snntorch import backprop
from snntorch import functional as SF
from snntorch import utils
from snntorch import spikeplot as splt

import torch
import torch.nn as nn
import torch.optim as opt
import pytorch_optimizer as optim
from torch.utils.data import DataLoader
import torch.utils.data as IterableDataset
from torchvision import datasets, transforms
import torch.nn.functional as F
from torch.utils.checkpoint import checkpoint

import random
import math

import matplotlib.pyplot as plt
import numpy as np
import itertools

Leaky neuron model, overriding the backward pass with a custom function

class LeakySigmoidSurrogate(nn.Module):
def init(self, beta, threshold=1.0, k=25):
super(Leaky_Surrogate, self).init()

    # initialize decay rate beta and threshold
    self.beta = beta
    self.threshold = threshold
    self.surrogate_func = self.FastSigmoid.apply

# the forward function is called each time we call Leaky
def forward(self, input_, mem):
    spk = self.surrogate_func((mem-self.threshold))  # call the Heaviside function
    reset = (spk - self.threshold).detach()
    mem = self.beta * mem + input_ - reset
    return spk, mem

# Forward pass: Heaviside function
# Backward pass: Override Dirac Delta with gradient of fast sigmoid
@staticmethod
class FastSigmoid(torch.autograd.Function):
    @staticmethod
    def forward(ctx, mem, k=25):
        ctx.save_for_backward(mem) # store the membrane potential for use in the backward pass
        ctx.k = k
        out = (mem > 0).float() # Heaviside on the forward pass: Eq(1)
        return out

    @staticmethod
    def backward(ctx, grad_output):
        (mem,) = ctx.saved_tensors  # retrieve membrane potential
        grad_input = grad_output.clone()
        grad = grad_input / (ctx.k * torch.abs(mem) + 1.0) ** 2  # gradient of fast sigmoid on backward pass: Eq(4)
        return grad, None

spike_grad = surrogate.fast_sigmoid(slope=25)
beta = 0.5

lif1 = snn.Leaky(beta=beta, spike_grad=spike_grad)

#crypto Script
net_inputs = 32
net_outputs = 4
net_outputs_offset =28
test_iterations = “Dynamic”
epochs = 10
layers = 6
forward=False
data_input_pth = “./mixcolumns_text.csv”
labels_input_pth = “./mixcolumns_cipher.csv”
num_epochs = 10
test_acc_hist = []
train_files = []
test_files = []

data_input_path = “./split_files_temp”
datafix= ‘mixcolumns_text.csv-’
labelfix= ‘mixcolumns_cipher.csv-’
train_count = 8
test_count = 2
batch_size =100
b_size=100
t_size=10
model=“SKIP”
dataset=“FILE”
activation = “arctan”

if (forward == True):
prefix = ‘Forward’
else:
prefix = ‘Backward’

results_file= f’./{prefix}Rijandel_Mix_Columns{model}-Network_Loading-{dataset}_Trainset-{train_count}_TestSet-{test_count}Evaluation{test_iterations}Outputs-Offset{net_outputs_offset}Outputs{net_outputs}.csv’
model1_filename = f’./{prefix}_Model-Iteration-{model}-Network_Loading-{dataset}_Trainset-{train_count}_Testset-{test_count}Evaluation{test_iterations}Outputs-Offset{net_outputs_offset}Outputs{net_outputs}.csv’

device = torch.device(“cuda:1” if torch.cuda.is_available() else “cpu”)
torch.manual_seed(0)
print(device)

def binary(vector):
zero = torch.Tensor([0.0]).to(device)
one = torch.Tensor([1.0]).to(device)
return torch.where(torch.sigmoidi(vector)>0.5,one,zero)

def binaryC(vector):
zero = torch.Tensor([0.0])
one = torch.Tensor([1.0])
return torch.where(torch.sigmoid(vector)>0.5,one,zero)

class RijiDatasetData(torch.utils.data.Dataset):
#Step 1 when creating a personal Dataset - override init()

given is a path, then prefixes for inputs and outputs file and a number indicating the size of the training and testing set (in files)

#in this case i have shorten data with 100 samples of 511x32 size #torch.size[100,1,32] #each file contains 10000000 samples…
def init(self, data_path, datafix, labelfix, number, forward, out_size, out_offsize, test):
self.data_path = data_path
self.output_size = out_size
self.output_offsize = out_offsize
if (forward == True):
self.input = datafix
self.output = labelfix
else:
self.input = labelfix
self.output = datafix
#Initiate the train set from the first 80% of picked up files
rand_array = [100] #random.sample(range(0,430),number)
output_str_array = [str(integer).rjust(6,‘0’) for integer in rand_array]

    input_str_array = [f'{self.data_path}/{self.input}{str(integer).rjust(6,"0")}.txt' for integer in rand_array]
    output_str_array = [f'{self.data_path}/{self.output}{str(integer).rjust(6,"0")}.txt' for integer in rand_array]
    if (test == 1):
        test_files.append(input_str_array)
    else:
        train_files.append(input_str_array)

    print(input_str_array)
    print(output_str_array)
    self.input_data = []
    self.output_data = []
    for path in input_str_array:
        in_file = open(path)
        for l in in_file:

self.input_data.append(torch.tensor(np.fromstring(l, dtype=float, sep=’ ‘), dtype=torch.float))
for path in output_str_array:
out_file = open(path)
for l in out_file:
self.output_data.append(torch.tensor(np.fromstring(l, dtype=float, sep=’ '), dtype=torch.float))

    self.input_data = torch.stack(self.input_data)
    self.output_data = torch.stack(self.output_data)

def __len__(self):
    return len(self.input_data)

def __getitem__(self,idx):
    input_data = self.input_data[idx]
    output_data = self.output_data[idx][self.output_offsize:self.output_size+self.output_offsize]

    return input_data, output_data

#Do not use for individual bits learning. Works only for the whole dataset and whole width
class RijiDataset(torch.utils.data.IterableDataset):

#Step 1 when creating a personal Dataset - override __init__()
def __init__(self, datafix, labelfix, number, forward, out_size, out_offsize, test):
    self.output_size = out_size
    self.output_offsize = out_offsize
    if (forward == True):
        self.input_path = datafix
        self.outputpath = labelfix
    else:
        self.input_path = labelfix
        self.output_path = datafix
    self.output_size = out_size

def in_line_mapper(self,line):
    lin = line.rstrip("\n")
    v = torch.tensor(np.fromstring(lin, dtype=float, sep=' '), dtype=torch.float)
    return v
def out_line_mapper(self,line):
    lin = line.rstrip("\n")
    v = torch.tensor(np.fromstring(lin, dtype=float, sep=' '), dtype=torch.float)
    return v[self.output_offsize:self.output_size+self.output_offsize]

def __iter__(self):

    input_itr = open(self.input_path)
    output_itr = open(self.output_path)

    mapped_in_itr = map(self.in_line_mapper, input_itr)
    mapped_ou_itr = map(self.out_line_mapper, output_itr)

    zipped_itr = zip(mapped_in_itr,mapped_ou_itr)
    return zipped_itr

#ending dataset from rij1 mixed columns

subset=10

dtype = torch.float
device = torch.device(“cuda”) if torch.cuda.is_available() else torch.device(“cpu”)

Define a transform

transform = transforms.Compose([
transforms.Resize((28, 28)),
transforms.Grayscale(),
transforms.ToTensor(),
transforms.Normalize((0,), (1,))])

neuron and simulation parameters

spike_grad = surrogate.fast_sigmoid(slope=25)
beta = 0.5
num_steps = 50
lif1 = snn.Leaky(beta=beta, spike_grad=spike_grad)

#construct spike model

class Net(nn.Module):
def init(self, net_inputs, net_outputs):
super(Net,self).init()
self.conv1 = nn.Conv1d(100, 1 , 3)
self.lif1 = snn.Leaky(beta=beta, spike_grad=spike_grad)
self.conv1 = nn.Conv1d(1, 100, 3)
self.lift2 = snn.Leaky(beta=beta, spike_grad=spike_grad)
self.fc1 = nn.Linear(28, 100)
self.lif3 = snn.Leaky(beta=beta, spike_grad=spike_grad)

def forward(self, x):
    mem1 = self.lift1.init_leaky()
    mem2 = self.lift2.init_leaky()
    mem3 = self.lift3.init_leaky()
    

    spk3_rec = []
    mem3_rec = []

    for step in range(num_steps):
        cur1 = F.max_pool2d(self.conv1(x), 2)
        spk1,mem1 = self.lif1(cur1, mem1)
        cur2 = F.max_pool2d(self.conv2(spk1), 2)
        spk2,mem2 = self.lif2(cur2,mem2)
        cur3 = self.fc1(spk2.view(batch_size, -1))
        spk3,mem3 = self.lif3(cur3, mem3)

        spk3_rec.append(spk3)
        mem3_rec.append(mem3)

    return torch.stack(spk3_rec) , torch.stack(mem3_rec)

f (model == “SKIP”):
Mmodel = Net(net_inputs,net_outputs).to(device)
elif (model == “FC”):
Mmodel = CModel(net_inputs,net_outputs).to(device)
elif (model == “CONV”):
Mmodel = ConvModel(net_inputs,net_outputs).to(device)

#criterion1 = nn.MSELoss()
criterion1 = nn.BCEWithLogitsLoss()
#optimizer1 = optim.Ranger(Mmodel.parameters(), lr=0.001,alpha=0.5, k=6,betas=(0.95, 0.999), eps=1e-08, weight_decay=0)
#optimizer1 = optim.Ranger21(Mmodel.parameters(), lr=0.01,alpha=0.5, k=6,betas=(0.95, 0.999), eps=1e-08, weight_decay=0, N_sma_threshold=5)
optimizer1 = torch.optim.Adam(Mmodel.parameters(), lr=1e-2, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
##optimizer = torch.optim.Adam(net.parameters(), lr=1e-2, betas=(0.9, 0.999))
if (dataset == “FILE”):
train_dataset = RijiDatasetData(data_input_path,datafix, labelfix, train_count, forward, net_outputs, net_outputs_offset, test=0)
dataloader = DataLoader(train_dataset, batch_size=b_size, num_workers=8, shuffle=True)
elif(dataset == “POINTER”):
train_dataset = RijiDataset(data_input_pth, labels_input_pth, train_count, forward, net_outputs, net_outputs_offset, test=0, transform=transform)
#train_dataloader = DataLoader(train_dataset, batch_size=b_size, num_workers=8)
dataloader = DataLoader(train_dataset, batch_size=b_size, num_workers=8)

if (test_iterations == “Static” ):
test_dataset = RijiDatasetData(data_input_path,datafix, labelfix, test_count, forward, net_outputs, net_outputs_offset, test=1, transform=transform)
#test_dataloader = DataLoader(test_dataset, batch_size=b_size, num_workers=8)

Initialize Network

net = nn.Sequential(nn.Conv1d(100, 1 , 3),
nn.MaxPool1d(1),
snn.Leaky(beta=beta, spike_grad=spike_grad, init_hidden=True),
nn.Conv1d(1, 100, 3),
nn.MaxPool1d(1),
snn.Leaky(beta=beta, spike_grad=spike_grad, init_hidden=True),
nn.Flatten(),
nn.Linear(28,100),
snn.Leaky(beta=beta, spike_grad=spike_grad, init_hidden=True, output=True)
).to(device)

X, y = next(iter(dataloader))
X = X.to(device)
y = y.to(device)

for step in range(num_steps):
#for X, y in dataloader:
#X = torch.unsqueeze(X,dim=1).to(device)
print(X.shape)
print(y.shape)
spk_out, mem_out = net(X)
#mem_out = net(X)

def forward_pass(net, num_steps, X):
mem_rec = []
spk_rec = []
utils.reset(net) # resets hidden states for all LIF neurons in net

for step in range(num_steps):
#for X, y in dataloader:
    #X = torch.unsqueeze(X,dim=1).to(device)
    print(X.shape)
    #spk_out = net(X)
    #mem_out = net(X)
    spk_out, mem_out = net(X)
    spk_rec.append(spk_out)
    mem_rec.append(mem_out)

return torch.stack(spk_rec) , torch.stack(mem_rec)

spk_rec, mem_rec = forward_pass(net, num_steps, X)
#spk_rec = forward_pass(net, num_steps, X)

results1 = np.zeros([epochs,net_outputs+4])
running_loss1 = 0
running_loss2 = 0

max_accuracy = 0
target_0_1 = torch.zeros([1,net_outputs])
output_0_1 = torch.zeros([1,net_outputs])
for t in range(epochs):
Mmodel.train()
correct0 = 0.0
total0 = 0.0
for X, y in dataloader:

    #        X = torch.unsqueeze(X,dim=1).to(device)
#For CNN
    #X,y = torch.unsqueeze(X,dim=1).to(device), torch.unsqueeze(y,dim=1).to(device)
    #X,y = torch.squeeze(X,dim=1).to(device), torch.squeeze(y,dim=1).to(device)
    spk_out, mem_out = net(X)
    #spk_rec.append(spk_out)
    #mem_rec.append(mem_out)
    #out = Mmodel(X)

#For CNN
##loss1 = criterion1(torch.unsqueeze(out,dim=1),y)
#loss1 = criterion1(torch.squeeze(out,dim=1),y)
###loss1 = criterion1(out,y)
# already imported snntorch.functional as SF
loss_fn = SF.ce_rate_loss()
loss_val = loss_fn(spk_rec, X)
optimizer1.zero_grad()
loss1.backward()
optimizer1.step()
running_loss1 += loss1.item()
#learning accuracy
outs, ys = out.to(device), y.to(device)
total0 += ys.size(0)
outs = binaryC(outs)
target_0_1 += torch.sum(ys, dim=0)
output_0_1 = torch.sum(outs, dim=0)
correct0 += ((outs == ys).sum().item())/(ys.size(2))

print(f"The loss from an untrained network is {loss_val.item():.3f}")
acc = SF.accuracy_rate(spk_rec, net_outputs)
print(f"The accuracy of a single batch using an untrained network is {acc*100:.3f}%")
print('[%d] loss: %.5f and accuracy %.5f ' %(t + 1, running_loss1, 100*correct0/total0))
if (running_loss1/1000 == 0.0 or running_loss1 == running_loss2):
    break
results1[t][0] = t
results1[t][1] = running_loss1
results1[t][2] = 100* correct0/total0
results1[t][3:3+net_outputs] = target_0_1/total0

running_loss2 = running_loss1
running_loss1 = 0.0


Mmodel.eval()
correct1 = 0.0
total = 0.0
if (test_iterations == "Static" ):
    dataloader = DataLoader(test_dataset, batch_size=t_size, num_workers=8, shuffle=False)
elif (test_iterations == "Dynamic" ):
    test_dataset = RijiDatasetData(data_input_path,datafix, labelfix, test_count, forward, net_outputs, net_outputs_offset, test=1)
    dataloader = DataLoader(test_dataset, batch_size=t_size, num_workers=8, shuffle=False)
else:
    dataloader = DataLoader(train_dataset, batch_size=t_size, num_workers=8)
    with torch.no_grad():
        for X, y in dataloader:
            X,y = torch.unsqueeze(X,dim=1).to(device), torch.unsqueeze(y,dim=1).to(device)

outputs1 = Mmodel(X)
oututs, ys = outputs1.to(device), y.to(device)
total += ys.size(0)
outs = binaryC(oututs)
correct1 += ((outs == ys).sum().item())/(ys.size(2))

print('Accuracy of the network on the {} test images: {} '.format(total, (100*correct1 / total)))
results1[t][net_outputs+3] = 100* correct1/total
if (results1[t][3] > max_accuracy):
    max_accuracy = results1[t][3]
    torch.save(Mmodel.state_dict(), model1_filename)
if (results1[t][net_outputs+3] == 100):
    break

with open(results_file, ‘w’) as csvfile:

##spamwriter = csv.writer(csvfile, delimiter=' ')  
##spamwriter.writerow(train_files)                                                                                                       spamwriter.writerow(test_files)                                                                                                        spamwriter.writerows(results1)

def batch_accuracy(train_dataloader, net, num_steps):
with torch.no_grad():
total = 0
acc = 0
net.eval()

    train_dataloader = iter(train_dataloader)
    for Mmodel in train_loader:
        ConvModel(net_inputs,net_outputs).to(device)
        spk_rec, _ = forward_pass(net, num_steps, net_input)

        acc += SF.accuracy_rate(spk_rec, Mmodel = ConvModel(net_inputs,net_outputs).to(device)) * spk_rec.size(1)
        total += spk_rec.size(1)

return acc/total

test_acc = batch_accuracy(test_dataloader, net, num_steps)

print(f"The total accuracy on the test set is: {test_acc * 100:.2f}%")
for epoch in range(num_epochs):

avg_loss = backprop.BPTT(net, train_dataloader, optimizer=optimizer, criterion=loss_fn,
                        num_steps=num_steps, time_var=False, device=device)

print(f"Epoch {epoch}, Train Loss: {avg_loss.item():.2f}")

# Test set accuracy
test_acc = batch_accuracy(train_dataloader, net, num_steps)
test_acc_hist.append(test_acc)

print(f"Epoch {epoch}, Test Acc: {test_acc * 100:.2f}%\n")

Plot Loss

fig = plt.figure(facecolor=“w”)
plt.plot(test_acc_hist)
plt.title(“Test Set Accuracy”)
plt.xlabel(“Epoch”)
plt.ylabel(“Accuracy”)
plt.show()

spk_rec, mem_rec = forward_pass(net, num_steps, net_inputs)

from IPython.display import HTML

idx = 0

fig, ax = plt.subplots(facecolor=‘w’, figsize=(12, 7))
labels=[‘0’, ‘1’, ‘2’, ‘3’, ‘4’, ‘5’, ‘6’, ‘7’, ‘8’,‘9’]
print(f"The target label is: {net_outputs[idx]}")

plt.rcParams[‘animation.ffmpeg_path’] = ‘C:\path\to\your\ffmpeg.exe’

Plot spike count histogram

anim = splt.spike_count(spk_rec[:, idx].detach().cpu(), fig, ax, labels=labels,
animate=True, interpolate=4)

HTML(anim.to_html5_video())
anim.save(“spike_bar.mp4”)

with open(results_file, ‘w’) as csvfile:
spamwriter = csv.writer(csvfile, delimiter=’ ')
spamwriter.writerow(train_files)
spamwriter.writerow(test_files)
spamwriter.writerows(results1)

error message

Traceback (most recent call last):
File “/raid/rijndael-mixcolumns/spike1.py”, line 447, in
spk_rec, mem_rec = forward_pass(net, num_steps, X)
File “/raid/rijndael-mixcolumns/spike1.py”, line 441, in forward_pass
spk_out, mem_out = net(X)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/module.py”, line 1130, in _call_impl
return forward_call(*input, **kwargs)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/container.py”, line 139, in forward
input = module(input)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/module.py”, line 1130, in _call_impl
return forward_call(*input, **kwargs)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/conv.py”, line 307, in forward
return self._conv_forward(input, self.weight, self.bias)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/conv.py”, line 303, in _conv_forward
return F.conv1d(input, weight, bias, self.stride,
RuntimeError: Given groups=1, weight of size [1, 100, 1], expected input[100, 1, 32] to have 100 channels, but got 1 channels instead

error
cuda:1
cuda
[‘./split_files_temp/mixcolumns_cipher.csv-000100.txt’]
[‘./split_files_temp/mixcolumns_text.csv-000100.txt’]
torch.Size([100, 32])
torch.Size([100, 4])
Traceback (most recent call last):
File “/raid/rijndael-mixcolumns/spike1.py”, line 426, in
spk_out, mem_out = net(X)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/module.py”, line 1130, in _call_impl
return forward_call(*input, **kwargs)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/container.py”, line 139, in forward
input = module(input)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/module.py”, line 1130, in _call_impl
return forward_call(*input, **kwargs)
File “/raid/rijndael-mixcolumns/envs/pytorch/lib/python3.9/site-packages/torch/nn/modules/linear.py”, line 114, in forward
return F.linear(input, self.weight, self.bias)
RuntimeError: mat1 and mat2 shapes cannot be multiplied (100x32 and 100x32)