hi, thank you very much .
here are the essential code with essential functions.
- data and training of “model” (no debugging required)
import matplotlib.pyplot as plt
#from IPython.display import display, Markdown, Latex
import numpy as np
import math, random
import pandas as pd
from pandas import DataFrame
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torch.nn.init as init
import torch.utils.data
import torch.optim
from torch.optim import lr_scheduler
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import r2_score
from scipy.signal import gausspulse
%matplotlib notebook
import numpy as np
import matplotlib.pyplot as plt
#%matplotlib inline
#import mpld3
#mpld3.enable_notebook()
J = 0.001;b = 0.1;K = 0.01;R = 1;L = 0.5;
#A = [-b/J K/J ,
#-K/L -R/L];
#B = [0 1/L];
#C = [1 0];D = 0;
# define the continuous-time system matrices
A=np.matrix([[-b/J, K/J],[- K/L, -R/L]])
B=np.matrix([[0],[1/L]])
C=np.matrix([[1, 0]])
amp=10
#define an initial state for simulation
#x0=np.random.rand(2,1)
x0=np.array([[0],[0]])
#define the number of time-samples used for the simulation
#and the sampling time for the discretization
time_tot=50
sampling=0.1
time=int(time_tot /sampling)
min_u=1
max_u=10
step_u=1
##########define an input sequence for the simulation
#input voltage
tt= np.arange(0, time, 1)
freq_sin=0.05
freq_sin_val=0.05
in_seq=1*np.sin((freq_sin)*tt)
#in_seq=amp*np.ones(time)
#input_seq=np.random.rand(time,1)
#in_seq=amp*np.random.rand(time,1)
#in_seq=amp*np.ones(time)
#print(in_seq)
%matplotlib notebook
plt.plot(in_seq)
plt.title("in seq chosen for system simulation")
plt.show()
0#####calculate the discrete system using Euler mathod
# the following function simulates the state-space model using the backward Euler method
# the input parameters are:
# -- A,B,C - continuous time system matrices
# -- initial_state - the initial state of the system
# -- time_steps - the total number of simulation time steps
# -- sampling_perios - the sampling period for the backward Euler discretization
# this function returns the state sequence and the output sequence
# they are stored in the vectors Xd and Yd respectively
def simulate(A,B,C,initial_state,input_sequence, time_steps,sampling_period):
from numpy.linalg import inv
I=np.identity(A.shape[0]) # this is an identity matrix
Ad=inv(I-sampling_period*A)
Bd=Ad*sampling_period*B
Xd=np.zeros(shape=(A.shape[0],time_steps+1))
Yd=np.zeros(shape=(C.shape[0],time_steps+1))
for i in range(0,time_steps):
if i==0:
Xd[:,[i]]=initial_state
Yd[:,[i]]=C*initial_state
x=Ad*initial_state+Bd*input_sequence[i]
else:
Xd[:,[i]]=x
Yd[:,[i]]=C*x
x=Ad*x+Bd*input_sequence[i]
Xd[:,[-1]]=x
Yd[:,[-1]]=C*x
return Xd, Yd
state,output=simulate(A,B,C,x0,in_seq, time ,sampling)
print(state.shape)
%matplotlib notebook
plt.figure(2)
plt.title("state trajectory with chosen input")
plt.plot(state[1])
plt.show()
#XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX Training data XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
######################## creat stacked input profiles #####################
input_mag_train=np.arange(min_u,max_u,step_u)
u_len_train=input_mag_train.shape[0]
print("u_len_train is:" ), print(u_len_train)
tt= np.arange(0, time, 1)
u_train=[]
for i in input_mag_train:
#u1_train=i*np.ones(time)
u1_train=i*np.sin((freq_sin)*tt)
#print(u1_train)
u_train.append(u1_train)
u_array_train=np.array(u_train)
####################################################################
print("u train shape is:"),print(u_array_train.shape)
print("u train is:"),print(u_array_train)
############################creat statcked state data ################################
#state_train=[]
#output=[]
ss_train=[]
for j in range(0,u_array_train.shape[0]):
#choose the input profile
#in_seq=amp*np.ones(time)
input_seq_train=u_array_train[j,:]
#intial condtions
x0=np.array([[0],[0]])
x0_train=x0
state_train,output=simulate(A,B,C,x0_train,input_seq_train, time ,sampling)
#state_train.append(state_train)
#output=output.append(output)
#print("state train is: "),print(state_train)
state_train.tolist()
#print("ss is:"),print(ss)
ss_train.append(state_train)
#print ("appedde list is:", ss )
#convert the list into array
ss_train_array=np.array(ss_train)
#print('ss array train is dim:(u_len, no of states, time length+1)): '),print(ss_train_array)
m1=ss_train_array[0,0,:].reshape(-1,1)
m2=ss_train_array[0,1,:].reshape(-1,1)
m12=np.concatenate((m1,m2),axis=1)
#print("m12 at begining :"),print(m12)
for j in range(1,u_array_train.shape[0]):
a1=ss_train_array[j,0,:].reshape(-1,1)
a2=ss_train_array[j,1,:].reshape(-1,1)
a12=np.concatenate((a1,a2),axis=1)
m12=np.concatenate((m12,a12),axis=0)
#print("a12 is :"),print(a12)
print("m12 is :"),print(m12)
# m12 is the state vector cocatenated and arranged as desired
# fiirst colum is x1, coulomn 2 is x2, and column 3 is input trajectory
#m12 is the x_var with column 1 and 2.
# the data must be transformed here (standardised)
#the scaler is defined and fit (one and one time only) to the train set
#this same scaler should be fit on all future datasets including the test and validation
#defining the scaler
scaler=MinMaxScaler(feature_range=(-1,1))
# fit and transform the X-train set
m12=scaler.fit_transform(m12)
#cocatenate zeros in fron of u vector
zeros_to_add=np.zeros(shape=(int((max_u-min_u)/step_u),1 ))
um=np.concatenate((zeros_to_add,u_array_train),axis=1)
# cretae the colum vector of control inputs.
cc=um.reshape(-1,1)
print ("u array train is", u_array_train)
#print ("u array train transformed is",scaler.transform(u_array_train))
# concatenate with the state vector
X_train=np.concatenate((m12,cc),axis=1)
# now, use X-train to creat Y-train or target for the NN
#create Y-train from here
# tot hat end, first remove the first row of X-train
test= np.delete(X_train, 0, axis=0)
#then remove the 3rd column of X_train
Y_train1= np.delete(test, 2, axis=1)
#fnally to assure that train and test set have equal length, remove the last row of X-train
X_train1= np.delete(X_train, (X_train.shape[0]-1), axis=0)
print("X_train1 shape is :"), print(X_train1.shape)
print("Y_train1 shape is :"), print(Y_train1.shape)
print("X_train1:"), print(X_train1)
print("Y_train1 :"), print(Y_train1)
%matplotlib notebook
plt.figure()
plt.title("training state trajectory with chosen input")
plt.plot(X_train1)
plt.show()
#hyper parameters for NN network
#setting the hyperparameters
HP = {
#'NUM_HIDDEN_NODES' : 10 ,
#'NUM_EXAMPLES' : 5 ,
'TRAIN_SPLIT' : .8 ,
'MINI_BATCH_SIZE' : 500,#10*int((2/3)*(u_array_train.shape[1]+1)/4) ,
'NUM_EPOCHS' : 300,
'LEARNING_RATE' : 5e-3 ,
'LEARNING_RATE_DECAY' : 500 ,
'WEIGHT_DECAY' : 5e-4 ,
'NUM_MOMENTUM' : 0.9 ,
'NUM_PATIENCE' : 50 ,
'SEED' : 2018
}
np.random.seed(HP['SEED'])
torch.manual_seed(HP['SEED'])
#some essential fucntions that are used throughout
def to_np(x):
return x.data.cpu().numpy()
def to_var(x, async=False):
if torch.cuda.is_available():
x = x.cuda(async=async)
return Variable(x)
####### XXXXXXXXXXXXXXXXXX Test data XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
######################## creat stacked input profiles #####################
freq_sin_test=0.05
min_u_test=1
max_u_test=2
step_u_test=1
input_mag_test=np.arange(min_u_test,max_u_test,step_u_test)
u_len_test=input_mag_test.shape[0]
#input_mag_test= np.arange(0, time, sampling)
#u_len_test=input_mag_test.shape[0]
u_test=[]
#time axis for sinus wave creat
tt= np.arange(0, time, 1)
for i in input_mag_test:
#u1_test=i*(np.cos((freq_sin_test)*tt)*np.sin((freq_sin_test)*tt))
#u1_test=i*np.ones(time)
#u1_test=i*np.sin((0.1)*input_mag_test)
#u1_test=i*np.random.rand(time,1)
u1_test=i*np.sin((freq_sin_test)*tt)
u_test.append(u1_test)
u_array_test=np.array(u_test)
#print('uarray is'),print(u_array_test)
####################################################################
#######creat statcked state data ################################
#state_train=[]
#output=[]
ss_test=[]
for j in range(0,u_array_test.shape[0]):
#choose the input profile
#in_seq=amp*np.ones(time)
input_seq_test=u_array_test[j,:]
#intial condtions
x0=np.array([[0],[0]])
x0_test=x0
state_test,output=simulate(A,B,C,x0_test,input_seq_test, time ,sampling)
#state_train.append(state_train)
#output=output.append(output)
#print("state train is: "),print(state_train)
state_test.tolist()
#print("ss is:"),print(ss)
ss_test.append(state_test)
#print ("appedde list is:", ss )
#convert the list into array
ss_test_array=np.array(ss_test)
#print('array ssnp is dim:(u_len, no of states, time length+1)): '),print(ss_test_array)
#### creating the cocatenated arrays for vlaidation data
m1=ss_test_array[0,0,:].reshape(-1,1)
m2=ss_test_array[0,1,:].reshape(-1,1)
m12t=np.concatenate((m1,m2),axis=1)
#print("m12 at begining :"),print(m12)
for j in range(1,u_array_test.shape[0]):
a1=ss_test_array[j,0,:].reshape(-1,1)
a2=ss_test_array[j,1,:].reshape(-1,1)
a12=np.concatenate((a1,a2),axis=1)
m12t=np.concatenate((m12t,a12),axis=0)
#print("a12 is :"),print(a12)
#print("m12 is :"),print(m12)
# m12 is the state vector cocatenated and arranged as desired
# fiirst colum is x1, coulomn 2 is x2, and column 3 is input trajectory
#m12t is the x_var with column 1 and 2.
# the data must be transformed here (standardised)
#the scaler is defined and fit (one and one time only) to the train set
#this same scaler should be fit on all future datasets including the test and validation
# fit and transform the X-val set
m12t=scaler.transform(m12t)
#cocatenate zeros in fron of u vector
zeros_to_add_test=np.zeros(shape=(int((max_u_test-min_u_test)/step_u_test),1 ))
um=np.concatenate((zeros_to_add_test,u_array_test),axis=1)
# cretae the colum vector of control inputs.
cc=um.reshape(-1,1)
# concatenate with the state vector
X_test=np.concatenate((m12t,cc),axis=1)
#create Y-train from here
# tot hat end, first remove the first row of X-train
test= np.delete(X_test, 0, axis=0)
#then remove the 3rd column of X_train
Y_test1= np.delete(test, 2, axis=1)
#fnally to assure that train and test set have equal length, remove the last row of X-train
X_test1= np.delete(X_test, (X_test.shape[0]-1), axis=0)
print("X_test1 shape is :"), print(X_test1.shape)
print("Y_test1 shape is :"), print(Y_test1.shape)
%matplotlib notebook
plt.figure()
plt.title("test X trajectory with chosen input")
plt.plot(X_test1)
plt.show()
plt.figure()
plt.title("test Y trajectory with chosen input")
plt.plot(Y_test1)
plt.show()
#print ("ssnp_val was:"), print(ss_val_array)
#print("X_test is:"),print(X_test1)
#print("Y test is :"),print(Y_test1)
#print(X_test1.shape),print(Y_test1.shape)
# finally construct the test oader for pytorch
#test_x, test_y=x_data[test_idx],y_data[test_idx]
test_set=torch.utils.data.TensorDataset(torch.FloatTensor(X_test1),torch.FloatTensor(Y_test1))
test_loader=torch.utils.data.DataLoader(test_set,batch_size=HP['MINI_BATCH_SIZE'],
shuffle=False,
pin_memory=True, num_workers=0)
# creat training set compising of both x and y
train_set=torch.utils.data.TensorDataset(torch.FloatTensor(X_train1),torch.FloatTensor(Y_train1))
train_loader=torch.utils.data.DataLoader(train_set,batch_size=HP['MINI_BATCH_SIZE'],
shuffle=True,pin_memory=True,num_workers=0)
#build neural network strucutre
class Network(nn.Module):
def __init__(self,D_in,D_out):
super().__init__()
# Inputs to hidden layer linear transformation
self.lin1 = nn.Linear(D_in, 100)
self.lin2=nn.Linear(100,100)
self.lin3=nn.Linear(100,100)
# Output layer,
self.output = nn.Linear(100, D_out)
# Define sigmoid activation and softmax output
#self.tanh = F.tanh()
def forward(self, x):# this is where the data flows in the network, respecting
#sequence of layers in forward method is very important.
# Pass the input tensor through each of our operations
x = self.lin1(x)
x = F.tanh(x)
x = self.lin2(x)
x = F.tanh(x)
x = self.lin3(x)
x = F.tanh(x)
x = self.output(x)
y = F.tanh(x)
return y
model = Network(X_train1.shape[1], Y_train1.shape[1])
criterion = torch.nn.MSELoss(size_average=False)
optimizer = torch.optim.SGD(model.parameters(),
lr=HP['LEARNING_RATE'])
#momentum=HP['NUM_MOMENTUM'],
#weight_decay=HP['WEIGHT_DECAY'],
#)
model.train()
train_losses = []
valid_losses = []
valid_score = []
epochs=[]
#start = time.time()
#epoch_iter = tqdm(range(1, HP['NUM_EPOCHS'] + 1))
epoch_iter =range(1, HP['NUM_EPOCHS'] + 1)
#for epoch in range(1, HP['NUM_EPOCHS'] + 1):
for epoch in epoch_iter:
#epoch_iter.set_description('Epoch')
epochs.append(epoch)
#training over all batch data
batch_idx, tloss_avg,vloss_avg = 0, 0,0
for batch_idx, (data, target) in enumerate(train_loader):
y_pred = model(to_var(data)) # predict y based on x
#print("y pred at epoch number %s is: %s" %(epoch,y_pred)),#, print(y_pred)
#print("target is:"), print(to_var(target))
loss = criterion(y_pred, to_var(target)) # compute loss
optimizer.zero_grad() # clear gradients
loss.backward() # compute gradients
optimizer.step() # apply gradients
tloss_avg += loss.item()
tloss_avg /= batch_idx+1
train_losses.append(tloss_avg)
print(" Epoch : %d , Train loss: %d " %(epoch,tloss_avg))
##################### test the trained model ########################
model.eval()
y_pred = []
for data, target in test_loader:
output = model(to_var(data))
to_np(output)
#print ("another")
y_pred.append(to_np(output))
y_pred = np.array(y_pred)
# intialize and put all the componets of y_pred in a
# single 2 column array
# intializing
Y_pred1=y_pred[0]
for i in range (1,y_pred.shape[0]):
Y_pred1=np.concatenate((Y_pred1,y_pred[i]),axis=0)
print("Y_pred1 shape is :",Y_pred1.shape)
####### plotting the figure ###########
%matplotlib notebook
plt.figure(figsize=(7,6))
plt.plot(Y_test1[:,0],label='Test speed', color='r')
plt.plot(Y_pred1[:,0],label='Predicted speed', color='k')
plt.plot(Y_pred1[:,1],label='Predicted current', color='g')
plt.plot(Y_test1[:,1],label='Test current', color='b')
plt.title("Prediction and True Values")
plt.xlabel("Indexes");
plt.ylabel("Amplitude");
plt.grid(True)
plt.legend(loc='upper right');
from numpy import linalg as LA
error=Y_pred1-Y_test1
#print(error)
# this is the measure of the prediction performance in percents
error_percentage=LA.norm(error,2)/LA.norm(Y_test1,2)*100
print("Error percentage is",error_percentage)
then, data generation for training of “modelFCNN”
################## training FCNN using trained NN "model" ############################
################ this section describes data preparation,
################ should be skipped for debugging #######################
%matplotlib notebook
import numpy as np
import matplotlib.pyplot as plt
#first prepare the u_ref to generate r_k+1 and r_k
min_u2_train = 1
max_u2_train = 3
step_u2_train = 1
ref_mag_train = np.arange(min_u2_train,max_u2_train,step_u2_train)
ref_u_len = ref_mag_train.shape[0]
print(ref_u_len)
ref_tot_time = 60
ref_time = int(ref_tot_time /0.1)# sampling)
ref_tt = np.arange(0, ref_time, 1)
ref_freq_sin = 0.03
ref_u_train = []
for i in ref_mag_train:
#u1_train=i*np.ones(ref_tt)
ref_u1_train=i*np.sin((ref_freq_sin)*ref_tt)
#print(u1_train)
ref_u_train.append(ref_u1_train)
ref_u_array_train=np.array(ref_u_train)
####################################################################
#print("ref_u_train is %s" %(ref_u_array_train))
print("ref_time is %s" %(ref_time))
#
###########################creat statcked state data ################################
ref_ss_train=[] #ref ss is the r_k and r_k+1 will be created from it later.
for j in range(0,ref_u_array_train.shape[0]):
#choose the input profile
#in_seq=amp*np.ones(time)
input_seq_train=ref_u_array_train[j,:]
#intial condtions
x0=np.array([[0],[0]])
x0_train=x0
state_train,output=simulate(A,B,C,x0_train,input_seq_train, ref_time ,sampling)
#state_train.append(state_train)
#output=output.append(output)
#print("state train is: "),print(state_train)
state_train.tolist()
#print("ss is:"),print(ss)
ref_ss_train.append(state_train)
#print ("appedde list is:", ss )
#convert the list into array
ref_ss_train_array=np.array(ref_ss_train)
#print('array ssnp is dim:(u_len, no of states, time length+1)): '),print(ss_train_array)
#print ("ref_ss_train_array is %s"
# %(ref_ss_train_array))
##################### DATA FORMATTING for ref_X-train (r_k) and ref_Y-train (r_k+1)##########################
#here, the -previously cre
m1=ref_ss_train_array[0,0,:].reshape(-1,1)
m2=ref_ss_train_array[0,1,:].reshape(-1,1)
m12r=np.concatenate((m1,m2),axis=1)
#print("m12r at begining :"),print(m12r)
for j in range(1,ref_u_array_train.shape[0]):
a1=ref_ss_train_array[j,0,:].reshape(-1,1)
a2=ref_ss_train_array[j,1,:].reshape(-1,1)
a12=np.concatenate((a1,a2),axis=1)
m12r=np.concatenate((m12r,a12),axis=0)
#print("a12 is :"),print(a12)
# fit and transform the X-train set
R_k=scaler.transform(m12r)
#create Y-train from here
# tot that end, first remove the first row of X-train
R_k_plus_1_train = np.delete(R_k, 0, axis=0)
#print ("R_k_plus1_train is %s" %(R_k_plus_1_train))
#fnally to assure that train and test set have equal length, remove the last row of R_k
R_k_train= np.delete(R_k, (R_k.shape[0]-1), axis=0)
#print ("R_k train is %s" %(R_k_train))
print("R_k_train shape is :"), print(R_k_train.shape)
print("R_k_plus1_train shape is :"), print(R_k_plus_1_train.shape)
%matplotlib notebook
plt.figure(103)
plt.plot(R_k_train,label='Rk train train', color='r')
plt.plot(R_k_plus_1_train,label='rk +1 train', color='b')
#plt.plot(X_test1[:,2],label='command test', color='k')
plt.title("rk and rk+1 train data")
plt.xlabel("time indexes");
plt.ylabel("amplitude");
plt.grid(True)
plt.legend(loc='upper right');
plt.show()
then, hyper parameters and sructure of to be trained model FCNN
####################### declaring NN hyper parameters and NN strucutre
HP1 = {
#'NUM_HIDDEN_NODES' : 10 ,
#'NUM_EXAMPLES' : 5 ,
'TRAIN_SPLIT' : .8 ,
'MINI_BATCH_SIZE' : 500,#10*int((2/3)*(u_array_train.shape[1]+1)/4) ,
'NUM_EPOCHS' : 100,
'LEARNING_RATE' : 5e-2 ,
'LEARNING_RATE_DECAY' : 500 ,
'WEIGHT_DECAY' : 5e-4 ,
'NUM_MOMENTUM' : 0.9 ,
'NUM_PATIENCE' : 50 ,
'SEED' : 2018
}
# creat FCNN network and define all other parameters
#build neural network
class NetworkFCNN(nn.Module):
def __init__(self,D_in,D_out):
super().__init__()
# Inputs to hidden layer linear transformation
self.lin1 = nn.Linear(D_in, 100)
self.lin2=nn.Linear(100,100)
self.lin3=nn.Linear(100,100)
# Output layer,
self.output = nn.Linear(100, D_out)
# Define sigmoid activation and softmax output
#self.tanh = F.tanh()
def forward(self, x):# this is where the data flows in the network, respecting
#sequence of layers in forward method is very important.
# Pass the input tensor through each of our operations
x = self.lin1(x)
x = F.relu(x)
x = self.lin2(x)
x = F.relu(x)
x = self.lin3(x)
x = F.relu(x)
x = self.output(x)
y = F.tanh(x)
return y
# creat training set compising of both x and y
train_set_ref=torch.utils.data.TensorDataset(torch.FloatTensor(R_k_train),torch.FloatTensor(R_k_plus_1_train))
train_loader_ref=torch.utils.data.DataLoader(train_set_ref,batch_size=HP1['MINI_BATCH_SIZE'],
shuffle=True,pin_memory=True,num_workers=0)
################ finally training the FCNN using the
#################output from already trained "model" #########################
modelFCNN = NetworkFCNN(R_k_plus_1_train.shape[1], 1)
criterion1 = torch.nn.MSELoss()
optimizer1 = torch.optim.Adam(modelFCNN.parameters(),
lr=HP1['LEARNING_RATE'])
#momentum=HP['NUM_MOMENTUM'],
#weight_decay=HP['WEIGHT_DECAY'],
#)
#scheduler1 = torch.optim.lr_scheduler.LambdaLR(optimizer1, lr_lambda=[epoch_smooth_decay])
# select the LR schedule
finally, the part that needs debugging, taining of FCNN using already trained “model”
modelFCNN.train()
model.eval()
ref_train_losses = []
ref_valid_losses = []
ref_valid_score = []
ref_epochs=[]
#start = time.time()
#epoch_iter = tqdm(range(1, HP['NUM_EPOCHS'] + 1))
epoch_iter =range(1, HP1['NUM_EPOCHS'] + 1)
#for epoch in range(1, HP['NUM_EPOCHS'] + 1):
for epoch in epoch_iter:
#epoch_iter.set_description('Epoch')
ref_epochs.append(epoch)
#training over all batch data
batch_idx_ref, tloss_avg_ref,vloss_avg_ref = 0, 0,0
for batch_idx_ref, (data_rk, data_rk_plus1) in enumerate(train_loader_ref):
#print (batch_idx)
y_pred_ref = modelFCNN(to_var(data_rk_plus1)) # predict y based on x
# concatenate the tensor with the state vector along columns
#sysnn_input = np.concatenate( (to_np(data_rk),to_np(y_pred_ref) ),axis=1)
sysnn_input = torch.cat( (data_rk,y_pred_ref) ,dim=1)
# now feed in this input to the trained sys NN model
output_sys=model(to_var(sysnn_input))
#compute the loss
loss_FCNN = criterion1(output_sys, to_var(data_rk_plus1)) # compute loss
optimizer1.zero_grad() # clear gradients
loss_FCNN.backward() # compute gradients
optimizer1.step() # apply gradients
tloss_avg_ref += loss_FCNN.item()
tloss_avg_ref /= batch_idx+1
ref_train_losses.append(tloss_avg_ref)
print(" Epoch : %s , Train loss: %s " %(epoch,tloss_avg_ref))
morever, i have NOT explicitely put
requires_grad=True
with any input, I assume it is so by default, when passing as Variable?