I am pretty new to RL and I am trying to code a simple RL task with pytorch.
The goal/task is the following: The initial state is toto and the agent takes an action Δt: t_0+Δt=t_0+Δt=t_1.
If t_1 equals 450 or 475 then it gets a reward, else he does not get a reward.
I am training the agent with DQN algorithm on a NN ( with: 2 Linear layes: fist layer n_in=1 n_out 128 and second layer n_in=128 and n_out=5):
observation space(t_i) is 700 --> t_i∈[0,700[ti∈[0,700[
action space (Δt) is 5 --> (Δt∈[−50,−25,0,25,50]Δt∈[−50,−25,0,25,50])
epsilon_start=0.9#e-greedy threshold start value
epsilon_end=0.01#e-greedy threshold end value
epsilon_decay=200#e-greedy threshold decay learning_rate=0.001# NN
optimizer learning rate batch_size=64#Q-learning batch size
Unfortunately it does not seem to converge to the values t_i = 450 or 475. I doesn’t seem to care about getting a reward. How can I improve my code so that the agent learns what I am trying to teach him? I put my code below in case the explanations were not clear enough:
from gym import spaces
class RL_env(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self):
super(RL_env, self).__init__()
n_actions_delta = 1 #delta_t
self.action_space = spaces.Discrete(5)
n_observations = 1 #time
self.observation_space = spaces.Discrete(700)
#initial time
self.time = 0
self.done = 0
self.reward = 0
def reset(self):
self.reward = 0
self.done = False
return self.time
def step(self,delta_t):
print('self time',self.time)
d_t = np.arange(-50,70,25)
self.time = (self.time + d_t[delta_t])%700
print('delta time',d_t[delta_t],'-->','self time',self.time)
if self.time == 475 or self.time == 450:
self.reward+=1
else:
self.reward += 0
info = {}
print('total reward',self.reward)
print('\n')
return self.time,self.reward, self.done, info
def render(self, mode='human', close=False):
print()
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.distributions import Categorical
dtype = torch.float
device = torch.device("cpu")
import random
import math
import sys
if not sys.warnoptions:#igrnore warnings
import warnings
warnings.simplefilter("ignore")
#hyper parameters
epsilon_start=0.9
#e-greedy threshold start value
epsilon_end=0.01#e-greedy threshold end value
epsilon_decay=200#e-greedy threshold decay
learning_rate=0.001# NN optimizer learning rate
batch_size=64#Q-learning batch size
env = RL_env()
#use replay memory (-> to stabilize and improve our algorithm)for training: store transitions observed by agent,
#then reuse this data later
#sample from it randomly (batch built by transitions are decorrelated)
class ReplayMemory:#allowes the agent to learn from earlier memories (speed up learning and break undesirable temporal correlations)
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
def push(self, transition):#saves transition
self.memory.append(transition)
if len(self.memory)>self.capacity:#if length of memory arra is larger than capacity (fixed)
del self.memory[0]#remove 0th element
def sample(self, batch_number):#samples randomly a transition to build batch
return random.sample(self.memory, batch_number)
def __len__(self):
return len(self.memory)
#Dqn NN (we want to maximize the discounted, cumulative reward)
#idea of Q-learning: we want to approximate with NN maximal Q-function (gives max return of action in given state)
#training update rule: use the fact that every Q-function for some policy obeys the Bellman equation
#difference between the two sides of the equality is known as the temporal difference error (we want to min -> Huber loss)
#calculate over batch of transitions sampled from the replay memory
class DqnNet(nn.Module):
def __init__(self):
super(DqnNet, self).__init__()
state_space = 1
action_space = env.action_space.n
num_hid = 128
self.fc1 = nn.Linear(state_space, num_hid)
self.fc2 = nn.Linear(num_hid, action_space)
self.gamma=0.5 #Q-learning discount factor (ensures that reward sum converges,
#makes actions from far future less important)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.sigmoid(self.fc2(x))
return x
#select action accordingly to epsilon greedy policy
#sometimes we use model for choosing action, other times sample uniformly
#probability of choosing a random action will start at epsilon_start and will decay (epsilon_decay) exponentially
#towards epsilon_end
steps_done=0
def predict_action(state):
global steps_done
sample=random.random()#random number
eps_threshold=epsilon_end+(epsilon_start-epsilon_end)*math.exp(-1.*steps_done/epsilon_decay)
steps_done += 1
if sample>eps_threshold:
x = eps_threshold,model(Variable(state,).type(torch.FloatTensor)).data.max(0)[1].view(1, 1)
return x#chose action from model
else:
x = eps_threshold,torch.tensor([[random.randrange(env.action_space.n)]])
return x#choose random action uniformly
#wtih the update_policy function we perform a single step of the optimization
#first sample a batch, concatenate all the tensors into a single one, compute Q-value and max Q-value,
#and combine them into loss
def update_policy():
if len(memory)<batch_size:#we want to sample a batch of size 64
return
transitions = memory.sample(batch_size)#take random transition batch from experience replay memory
batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)#convert batch-array of Transitions
#to Transition of batch-arrays
#-->zip(*) takes iterables as arguments and return iterator
batch_state = Variable(torch.cat(batch_state))#concatenate given sequence tensors in the given dimension
batch_state = batch_state.resize(batch_size,1)
batch_action = Variable(torch.cat(batch_action))
batch_next_state = Variable(torch.cat(batch_next_state))
batch_next_state = batch_next_state.reshape(batch_size,1)
batch_reward = Variable(torch.cat(batch_reward))
#print('model batch state',model(Variable(batch_state[0])))
current_q_values = model(batch_state).gather(1, batch_action)#current Q-values estimated for all actions,
#compute Q, then select the columns of actions taken,
#these are the actions which would've been taken
#for each batch state according to policy_net
max_next_q_values = model(batch_next_state).detach().max(1)[0]#predicted Q-values for non-final-next-states
#(-> gives max Q)
expected_q_values = batch_reward + (model.gamma * max_next_q_values)
#loss is measured from error between current and newly expected Q values (Huber Loss)
loss = F.smooth_l1_loss(current_q_values, expected_q_values)
# backpropagation of loss to NN --> optimize model
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss, np.sum(expected_q_values.numpy())
def train(episodes):
scores = []
Losses = []
Bellman = []
Epsilon = []
Times = []
Deltas = []
for episode in range(episodes):
state=env.reset()#reset environment
print('\n')
print('episode',episode)
epsilon_action = predict_action(torch.FloatTensor([state]))
action = epsilon_action[1] #after each time step predict action
next_state, reward, done,info = env.step(action.item())#step through environment using chosen action
epsilon = epsilon_action[0]
Epsilon.append(epsilon)
print(reward,'reward')
state=next_state
Times.append(state)
scores.append(reward)
memory.push((torch.FloatTensor([state]),action,torch.FloatTensor([next_state]),
torch.FloatTensor([reward])))#action is already a tensor
up = update_policy()#update_policy()#update policy
if up != None:
Losses.append(Variable(up[0]))
print('loss',Variable(up[0]))
Bellman.append(up[1])
#calculate score to determine when the environment has been solved
mean_score=np.mean(scores[-50:])#mean of score of last 50 episodes
#every 50th episode print score
if episode%50 == 0:
print('Episode {}\tScore: {}\tAverage score(last 50 episodes): {:.2f}'.format(episode,scores[-50:],mean_score))
#print('Losses',Losses)
Losses = torch.stack(Losses).numpy()
#print('Losses',Losses)
plt.plot(np.arange(len(Losses)),Losses)
plt.xlabel('Training iterations')
plt.ylabel('Loss')
plt.show()
Bellman = np.array(Bellman)
#print('Bellman',Bellman,'\n')
plt.plot(np.arange(len(Bellman)),Bellman)
plt.xlabel('Training iterations')
plt.ylabel('Bellman target')
plt.show()
#print('scores',scores)
plt.plot(np.arange(len(scores)),scores)
plt.xlabel('Training iterations')
plt.ylabel('Reward')
plt.show()
#print('epsilon',Epsilon)
plt.plot(np.arange(len(Epsilon)),Epsilon)
plt.xlabel('Training iterations')
plt.ylabel('Epsilon')
plt.show()
print('Times',Times[-25:])
print('Deltas',Deltas[-25:])
Times = np.array(Times)
print('Times',Times)
#plt.figure(figsize=(31,20))
plt.figure(figsize=(9,7))
plt.plot(np.arange(len(Times)),(np.array(Times)))
plt.xlabel('Training iterations')
plt.ylabel('t')
plt.show()
Times_1 = np.array(Times[-300:])
print('t',Times)
plt.figure(figsize=(9,7))
plt.plot(np.arange(len(Times_1)),(np.array(Times_1)))
plt.xlabel('Last 300 Training iterations')
plt.ylabel('t')
plt.ylim(0,1000)
plt.show()
model = DqnNet()#policy
memory = ReplayMemory(20000)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train(10000)