import gym
import random
import collections
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
#Hyperparameters
lr_mu = 0.0005 # learning rate of Actor
lr_q = 0.001 # learning rate of Critic
gamma = 0.99
batch_size = 32
buffer_limit = 50000
tau = 0.005 # for target network soft update
class ReplayBuffer():
def __init__(self):
self.buffer = collections.deque(maxlen=buffer_limit)
def put(self, data):
self.buffer.append(data)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_mask_lst)
def size(self):
return len(self.buffer)
class MuNet(nn.Module): #
def __init__(self):
super(MuNet, self).__init__()
self.fc1 = nn.Linear(3, 128)
self.fc2 = nn.Linear(128, 64)
self.fc_mu = nn.Linear(64, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = torch.tanh(self.fc_mu(x))*2 # Multipled by 2 because the action space of the Pendulum-v0 is [-2,2]
return mu
class QNet(nn.Module): #score(state, action)
enters the network in middle layer
def __init__(self):
super(QNet, self).__init__()
self.fc_s = nn.Linear(3, 64) #encodes the state
self.fc_a = nn.Linear(1,64) #encodes the actions
self.fc_q = nn.Linear(128, 32) #s+a = 128
self.fc_3 = nn.Linear(32,1) # output
def forward(self, x, a):
h1 = F.relu(self.fc_s(x))
h2 = F.relu(self.fc_a(a))
cat = torch.cat([h1,h2], dim=1)
q = F.relu(self.fc_q(cat))
q = self.fc_3(q)
return q
class OrnsteinUhlenbeckNoise:
def __init__(self, mu):
self.theta, self.dt, self.sigma = 0.1, 0.01, 0.1
self.mu = mu
self.x_prev = np.zeros_like(self.mu)
def __call__(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
self.x_prev = x
return x
def train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer):
s,a,r,s_prime,done_mask = memory.sample(batch_size)
target = r + gamma * q_target(s_prime, mu_target(s_prime))
q_loss = F.smooth_l1_loss(q(s,a), target.detach())
q_optimizer.zero_grad()
q_loss.backward()
q_optimizer.step()
mu_loss = -q(s,mu(s)).mean() # That's all for the policy loss.
mu_optimizer.zero_grad()
mu_loss.backward()
mu_optimizer.step()
def soft_update(net, net_target):
for param_target, param in zip(net_target.parameters(), net.parameters()):
param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
def main():
env = gym.make(‘Pendulum-v0’)
memory = ReplayBuffer()
q, q_target = QNet(), QNet()
q_target.load_state_dict(q.state_dict())
mu, mu_target = MuNet(), MuNet()
mu_target.load_state_dict(mu.state_dict())
score = 0.0
print_interval = 20
min_play_reward = 0
iterations = 100000
mu_optimizer = optim.Adam(mu.parameters(), lr=lr_mu)
q_optimizer = optim.Adam(q.parameters(), lr=lr_q)
ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(1))
def play_game():
done = False
state = env.reset()
while(not done):
a = mu(torch.from_numpy(s).float())
a = a.item() + ou_noise()[0]
s_prime, r, done, info = env.step([a])
env.render()
state = s_prime
for iteration in range(iterations):
s = env.reset()
for t in range(300): # maximum length of episode is 200 for Pendulum-v0
a = mu(torch.from_numpy(s).float())
a = a.item() + ou_noise()[0]
s_prime, r, done, info = env.step([a])
memory.put((s,a,r/100.0,s_prime,done))
score +=r
s = s_prime
if done:
if score/print_interval > min_play_reward:
play_game()
break
if memory.size()>2000:
train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer)
soft_update(mu, mu_target)
soft_update(q, q_target)
if iteration%print_interval==0 and iteration!=0:
print("# of episode :{}, avg score : {:.1f}".format(iteration, score/print_interval))
score = 0.0
env.close()
if name == ‘main’:
main()
This is Pendelum project code.
It has some error about type. Could you let me know how can i solve?
RuntimeError Traceback (most recent call last)
in ()
150
151 if memory.size()>2000:
–> 152 train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer)
153 soft_update(mu, mu_target)
154 soft_update(q, q_target)
5 frames
in train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer)
86
87 target = r + gamma * q_target(s_prime, mu_target(s_prime))
—> 88 q_loss = F.smooth_l1_loss(q(s,a), target.detach())
89
90 q_optimizer.zero_grad()
/usr/local/lib/python3.6/dist-packages/torch/nn/modules/module.py in call(self, *input, **kwargs)
548 result = self._slow_forward(*input, **kwargs)
549 else:
–> 550 result = self.forward(*input, **kwargs)
551 for hook in self._forward_hooks.values():
552 hook_result = hook(self, input, result)
in forward(self, x, a)
65 def forward(self, x, a):
66 h1 = F.relu(self.fc_s(x))
—> 67 h2 = F.relu(self.fc_a(a))
68 cat = torch.cat([h1,h2], dim=1)
69 q = F.relu(self.fc_q(cat))
/usr/local/lib/python3.6/dist-packages/torch/nn/modules/module.py in call(self, *input, **kwargs)
548 result = self._slow_forward(*input, **kwargs)
549 else:
–> 550 result = self.forward(*input, **kwargs)
551 for hook in self._forward_hooks.values():
552 hook_result = hook(self, input, result)
/usr/local/lib/python3.6/dist-packages/torch/nn/modules/linear.py in forward(self, input)
85
86 def forward(self, input):
—> 87 return F.linear(input, self.weight, self.bias)
88
89 def extra_repr(self):
/usr/local/lib/python3.6/dist-packages/torch/nn/functional.py in linear(input, weight, bias)
1608 if input.dim() == 2 and bias is not None:
1609 # fused op is marginally faster
-> 1610 ret = torch.addmm(bias, input, weight.t())
1611 else:
1612 output = input.matmul(weight.t())
RuntimeError: Expected object of scalar type Float but got scalar type Double for argument #2 ‘mat1’ in call to _th_addmm
above sentence