I was trying to re-implement the Cliff Walking game in Sutton’s book. A simple way is to transfer the official REINFORCE example to this game. Therefore, I directly apply the algorithm given in https://github.com/pytorch/examples/edit/master/reinforcement_learning/reinforce.py. However, it seems that the model doesn’t learn well in this simple setting. I was just wondering where goes wrong?
import argparse
import numpy as np
from itertools import count
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
class CliffWalking(object):
def __init__(self, step_limit=None):
self.shape = (4, 12)
# always start from the left-dow corner
# self.pos = np.asarray([self.shape[0] - 1, 0])
self.pos = np.asarray([2, 11])
# build a
self.cliff = np.zeros(self.shape, dtype=np.bool)
self.cliff[-1, 1:-1] = 1
self.actions = {
'U': 0,
'D': 1,
'L': 2,
'R': 3
}
self.action2shift = {
0: [-1, 0],
1: [1, 0],
2: [0, -1],
3: [0, 1]
}
self.transmit_tensor = self._build_transmit_tensor_()
self.num_actions = len(self.actions)
self.state_dim = 2
def _build_transmit_tensor_(self):
trans_matrix = [[[] for _ in range(self.shape[1])] for __ in range(self.shape[0])]
for i in range(self.shape[0]):
for j in range(self.shape[1]):
for a in range(len(self.actions)):
trans_matrix[i][j].append(self._cal_new_position_((i, j), a))
return trans_matrix
def _cal_new_position_(self, old_pos, action):
old_pos = np.asarray(old_pos)
new_pos = old_pos + self.action2shift[action]
new_pos[0] = max(new_pos[0], 0)
new_pos[0] = min(new_pos[0], self.shape[0] - 1)
new_pos[1] = max(new_pos[1], 0)
new_pos[1] = min(new_pos[1], self.shape[1] - 1)
reward = -1.
terminate = False
if self.cliff[old_pos[0]][old_pos[1]]:
reward = -100.
new_pos[0] = self.shape[0] - 1
new_pos[1] = 0
terminate = True
if old_pos[0] == self.shape[0] - 1 and old_pos[1] == self.shape[1] - 1:
terminate = True
new_pos[0] = self.shape[0] - 1
new_pos[1] = self.shape[1] - 1
return new_pos, reward, terminate
def take_action(self, action):
if isinstance(action, str):
new_pos, reward, terminate = self.transmit_tensor[self.pos[0]][self.pos[1]][self.actions[action]]
else:
new_pos, reward, terminate = self.transmit_tensor[self.pos[0]][self.pos[1]][action]
self.pos[0] = new_pos[0]
self.pos[1] = new_pos[1]
self.steps += 1
return new_pos, reward, terminate
def show_pos(self):
env = np.zeros(self.shape)
env[self.pos[0]][self.pos[1]] = 1
print(env)
def reset(self):
# self.pos = np.asarray([self.shape[0] - 1, 0])
self.pos = np.asarray([2, 10])
self.steps = 0
return self.pos
parser = argparse.ArgumentParser(description='PyTorch REINFORCE example')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
help='random seed (default: 543)')
parser.add_argument('--render', action='store_true',
help='render the environment')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='interval between training status logs (default: 10)')
args = parser.parse_args()
env = CliffWalking()
torch.manual_seed(args.seed)
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(2, 48)
self.affine2 = nn.Linear(48, 4)
self.saved_log_probs = []
self.rewards = []
def forward(self, x):
x = F.relu(self.affine1(x))
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=1e-2)
eps = np.finfo(np.float32).eps.item()
def select_action(state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = policy(state)
m = Categorical(probs)
action = m.sample()
policy.saved_log_probs.append(m.log_prob(action))
return action.item()
def finish_episode():
R = 0
policy_loss = []
rewards = []
for r in policy.rewards[::-1]:
R = r + args.gamma * R
rewards.insert(0, R)
rewards = torch.tensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() + eps)
for log_prob, reward in zip(policy.saved_log_probs, rewards):
policy_loss.append(-log_prob * reward)
optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
optimizer.step()
del policy.rewards[:]
del policy.saved_log_probs[:]
def main():
for i_episode in count(1):
state = env.reset()
done = False
while not done: # Don't infinite loop while learning
action = select_action(state)
state, reward, done = env.take_action(action)
policy.rewards.append(reward)
finish_episode()
if i_episode % 10 == 0 and (not i_episode == 0):
scores = []
for i in range(100):
terminate = False
state = env.reset()
score = 0
while not terminate:
action = select_action(state)
next_state, reward, terminate = env.take_action(action)
score += reward
scores.append(score)
print('[test score]episode %d: %.2f' % (i_episode, np.mean(np.asarray(scores))))
if __name__ == '__main__':
main()