I made a multi-threaded version of DQN, but it seems to train slower than normal one. I can’t find the bug, where is it? I’m new in multiprocessing.
wrappers.py:
import gym
import numpy as np
import PIL.Image
from collections import deque
class ActionSkip(gym.Wrapper):
def __init__(self, env, n_skips=4):
super(ActionSkip, self).__init__(env)
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(210, 160, 3), dtype=np.uint8)
self.n_skips = n_skips
def step(self, action):
queue = []
all_rewards = 0
for _ in range(self.n_skips):
obs, reward, done, _ = self.env.step(action)
queue.append(obs)
all_rewards += reward
if done:
break
return np.max(np.stack(queue), axis=0), all_rewards, done, {}
def reset(self, **kwargs):
return self.env.reset()
class ImageFormat(gym.ObservationWrapper):
def __init__(self, env):
super(ImageFormat, self).__init__(env)
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(1, 84, 84), dtype=np.float64)
def observation(self, observation):
obs_img = PIL.Image.fromarray(observation)
obs_resize = obs_img.resize((84, 84))
obs_transpose = np.asarray(obs_resize).mean(axis=-1)[None, ...]
return obs_transpose / 127.5 - 1.
class ResetEnv(gym.Wrapper):
def __init__(self, env=None):
super(ResetEnv, self).__init__(env)
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(1, 84, 84), dtype=np.float64)
def fire(self):
for _ in range(15):
obs, r, done, _ = self.env.step(self.env.action_space.sample())
if done:
return self.reset()
return obs
def step(self, action):
traj = self.env.step(action)
return traj
def reset(self):
self.env.reset()
return self.fire()
class FrameStack(gym.ObservationWrapper):
def __init__(self, env):
super(FrameStack, self).__init__(env)
self.queue = deque(maxlen=6)
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(6, 84, 84), dtype=np.float64)
def observation(self, observation):
self.queue.append(observation)
obs_stack = np.zeros((6, 84, 84))
for idx, element in enumerate(self.queue):
obs_stack[idx] = element[0]
return obs_stack
def reset(self, **kwargs):
self.queue.clear()
obs = self.env.reset()
obs_stack = np.zeros((6, 84, 84))
obs_stack[0] = obs[0]
return obs_stack
def wrap_env(env):
return FrameStack(ResetEnv(ImageFormat(ActionSkip(env))))
model.py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
class NoisyLinear(nn.Linear):
def __init__(self, in_features, out_features, bias=True, noise_level=.017):
super(NoisyLinear, self).__init__(in_features, out_features, bias=bias)
self.sigma_weight = nn.Parameter(torch.full((out_features, in_features), noise_level))
self.register_buffer("epsilon_weight", torch.zeros(out_features, in_features))
if bias:
self.sigma_bias = nn.Parameter(torch.full((out_features,), noise_level))
self.register_buffer("epsilon_bias", torch.zeros(out_features))
def forward(self, x):
self.epsilon_weight.data.normal_(0., 1.)
w_noise = self.epsilon_weight * self.sigma_weight
weight = self.weight + w_noise
if self.bias is not None:
self.epsilon_bias.data.normal_(0., 1.)
b_noise = self.epsilon_bias * self.sigma_bias
bias = self.bias + b_noise
else:
bias = None
return F.linear(x, weight, bias)
class ConvModel(nn.Module):
def __init__(self, input_shape, n_actions):
super(ConvModel, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_channels=input_shape[0], out_channels=16, kernel_size=(8, 8), stride=(4, 4), padding=(0, 0)), nn.LeakyReLU(.2),
nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(4, 4), stride=(2, 2), padding=(0, 0)), nn.LeakyReLU(.2),
nn.Flatten())
conv_out_size = int(np.prod(self.conv(torch.zeros(1, *input_shape)).shape))
self.fc = nn.Sequential(
NoisyLinear(in_features=conv_out_size, out_features=256), nn.LeakyReLU(.2),
NoisyLinear(in_features=256, out_features=n_actions))
def forward(self, x):
return self.fc(self.conv(x))
common.py
import collections
import multiprocessing
import numpy as np
import torch
import torch.nn.functional as F
Trajectory = collections.namedtuple("Trajectory", field_names=("step", "state", "action", "reward", "next_state",
"done", "info", "total_rewards"))
Episode = collections.namedtuple("Episode", field_names=("steps", "reward"))
class Agent(object):
def __init__(self, net, action_spaces, epsilon, device):
super(Agent, self).__init__()
self.net = net
self.action_spaces = action_spaces
self.epsilon = epsilon
self.device = device
@torch.no_grad()
def __call__(self, obs):
if np.random.rand() > self.epsilon:
obs_v = torch.FloatTensor(obs).to(self.device)
net_out = self.net(obs_v).data.cpu().numpy()
actions = np.argmax(net_out, axis=1).tolist()
else:
actions = [action_space.sample() for action_space in self.action_spaces]
return actions
class ExpIterator(object):
def __init__(self, envs, agent, gamma, n_step_unroll=1):
super(ExpIterator, self).__init__()
self.envs = envs
self.agent = agent
self.gamma = gamma
self.n_step_unroll = n_step_unroll
self.dones = [True for _ in envs]
self.obs = [None for _ in envs]
self.steps = [0. for _ in envs]
self.episode_rewards = [0. for _ in envs]
self.buffer = collections.deque(maxlen=n_step_unroll)
def __call__(self):
once = True
while len(self.buffer) < self.n_step_unroll or once:
once = False
for d_idx, done in enumerate(self.dones):
if done:
self.dones[d_idx] = False
self.obs[d_idx] = self.envs[d_idx].reset()
self.steps[d_idx] = 0.
self.episode_rewards[d_idx] = 0.
actions = self.agent(np.stack(self.obs))
trajs_raw = [env.step(action) for env, action in zip(self.envs, actions)]
trajectories = []
for t_idx, traj_raw in enumerate(trajs_raw):
self.episode_rewards[t_idx] += traj_raw[1]
self.steps[t_idx] += 1
trajectories.append(Trajectory(step=self.steps[t_idx], state=self.obs[t_idx], action=actions[t_idx],
reward=traj_raw[1], next_state=traj_raw[0], done=traj_raw[2],
info=traj_raw[3], total_rewards=self.episode_rewards[t_idx]))
self.dones[t_idx] = traj_raw[2]
self.obs[t_idx] = traj_raw[0]
self.buffer.append(trajectories)
unroll_trajectories = []
for traj_idx in range(len(self.envs)):
unroll_reward = 0.
for unroll_idx, unroll in enumerate(self.buffer):
unroll_reward += unroll[traj_idx].reward*self.gamma**unroll_idx
if unroll[traj_idx].done:
break
unroll_trajectories.append(Trajectory(step=self.buffer[0][traj_idx].step, state=self.buffer[0][traj_idx].state,
action=self.buffer[0][traj_idx].action, reward=unroll_reward,
next_state=self.buffer[0][traj_idx].next_state, done=self.buffer[0][traj_idx].done,
info=self.buffer[0][traj_idx].info, total_rewards=self.buffer[0][traj_idx].total_rewards))
return unroll_trajectories
def __iter__(self):
while True:
yield self()
class Buffer(object):
def __init__(self, buffer_capacity, exp_source):
super(Buffer, self).__init__()
self.core = collections.deque(maxlen=buffer_capacity)
self.exp_source = exp_source
def populate(self, n):
for idx, trajectories in enumerate(self.exp_source):
for trajectory in trajectories:
self.core.append(trajectory)
if idx + 1 >= n:
break
def sample(self, batch_size):
indices = np.arange(len(self.core))
np.random.shuffle(indices)
return [self.core[idx] for idx in indices[:batch_size]]
def iterate(self, initial, sample_interval, batch_size):
self.populate(initial)
while True:
episodes = []
for trajectory in list(self.core)[-self.exp_source.n_step_unroll:]:
if trajectory.done:
episodes.append(Episode(steps=trajectory.step, reward=trajectory.total_rewards))
yield self.sample(batch_size), episodes
self.populate(sample_interval)
def calculate_loss(batch, net, tgt_net, gamma, device="cuda"):
states, actions, rewards, next_states, dones = [], [], [], [], []
for trajectory in batch:
states.append(np.array(trajectory.state, dtype=np.float32, copy=False))
actions.append(int(trajectory.action))
rewards.append(float(trajectory.reward))
next_states.append(np.array(trajectory.next_state, dtype=np.float32, copy=False))
dones.append(bool(trajectory.done))
states_v = torch.FloatTensor(np.array(states, dtype=np.float32, copy=False)).to(device)
next_states_v = torch.FloatTensor(np.array(next_states, dtype=np.float32, copy=False)).to(device)
rewards_v = torch.FloatTensor(rewards).to(device)
with torch.no_grad():
next_actions = net(next_states_v).data.cpu().numpy().argmax(axis=1)
next_values_v = tgt_net(next_states_v)[range(next_states_v.shape[0]), next_actions]
bellman_values = rewards_v + next_values_v*gamma
values_predicted_v = net(states_v)[range(states_v.shape[0]), actions]
return F.mse_loss(values_predicted_v, bellman_values)
def play_process(exp_source, queue):
for exp in exp_source:
queue.put(exp)
main.py:
import time
import model
import wrappers
import common
import torch
import torch.optim as optim
import torch.multiprocessing as mp
from tensorboardX import SummaryWriter
import gym
ENV_NAME = "PongNoFrameskip-v4"
DEVICE = torch.device("cuda")
N_ENVS = 3
BUFFER_CAPACITY = 10000
INITIAL_STEPS = 100
SAMPLE_INTERVAL = 1
N_STEP_UNROLL = 4
GAMMA = .99
BATCH_SIZE = 32*N_ENVS
LR = 1e-4
EPSILON_START = 1.
EPSILON_FINAL = .01
EPSILON_STEPS = 75000
TGT_NET_SYNC = 1000
if __name__ == "__main__":
mp.set_start_method("spawn")
envs = [wrappers.wrap_env(gym.make(ENV_NAME)) for _ in range(N_ENVS)]
net = model.ConvModel(envs[0].observation_space.shape, envs[0].action_space.n).to(DEVICE)
opt = optim.Adam(net.parameters(), lr=1e-4)
tgt_net = model.ConvModel(envs[0].observation_space.shape, envs[0].action_space.n).to(DEVICE)
agent = common.Agent(net, [env.action_space for env in envs], EPSILON_START, DEVICE)
exp_source = common.ExpIterator(envs, agent, GAMMA, N_STEP_UNROLL)
replay_buffer = common.Buffer(BUFFER_CAPACITY, exp_source)
net.share_memory()
tgt_net.share_memory()
queue = mp.Queue(maxsize=8)
writer = SummaryWriter(comment="PongDoubleNoisy")
process = mp.Process(target=common.play_process, args=(exp_source, queue))
process.start()
b_idx = 0
while True:
while queue.qsize():
start = time.time_ns()
train_entry = queue.get()
episodes = []
for traj in train_entry:
replay_buffer.core.append(traj)
if traj.done:
episodes.append(common.Episode(steps=traj.step, reward=traj.total_rewards))
batch = replay_buffer.sample(BATCH_SIZE*N_ENVS)
if not b_idx % TGT_NET_SYNC:
tgt_net.load_state_dict(net.state_dict())
agent.epsilon = max(EPSILON_FINAL, agent.epsilon - SAMPLE_INTERVAL*(EPSILON_START - EPSILON_FINAL)/EPSILON_STEPS)
opt.zero_grad()
loss_v = common.calculate_loss(batch, net, tgt_net, GAMMA, DEVICE)
loss_v.backward()
opt.step()
writer.add_scalar("loss", loss_v, b_idx)
writer.add_scalar("epsilon", agent.epsilon, b_idx)
writer.add_scalar("fps", 1e9/(time.time_ns()-start), b_idx)
for episode in episodes:
writer.add_scalar("reward", episode.reward, b_idx)
writer.add_scalar("steps", episode.steps, b_idx)
print(episode.reward, episode.steps)
b_idx += 1