Parallelized DQN trains longer

I made a multi-threaded version of DQN, but it seems to train slower than normal one. I can’t find the bug, where is it? I’m new in multiprocessing.

wrappers.py:

import gym
import numpy as np
import PIL.Image
from collections import deque


class ActionSkip(gym.Wrapper):
    def __init__(self, env, n_skips=4):
        super(ActionSkip, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(210, 160, 3), dtype=np.uint8)

        self.n_skips = n_skips

    def step(self, action):
        queue = []
        all_rewards = 0
        for _ in range(self.n_skips):
            obs, reward, done, _ = self.env.step(action)
            queue.append(obs)
            all_rewards += reward
            if done:
                break
        return np.max(np.stack(queue), axis=0), all_rewards, done, {}

    def reset(self, **kwargs):
        return self.env.reset()


class ImageFormat(gym.ObservationWrapper):
    def __init__(self, env):
        super(ImageFormat, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(1, 84, 84), dtype=np.float64)

    def observation(self, observation):
        obs_img = PIL.Image.fromarray(observation)
        obs_resize = obs_img.resize((84, 84))
        obs_transpose = np.asarray(obs_resize).mean(axis=-1)[None, ...]
        return obs_transpose / 127.5 - 1.


class ResetEnv(gym.Wrapper):
    def __init__(self, env=None):
        super(ResetEnv, self).__init__(env)
        self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(1, 84, 84), dtype=np.float64)

    def fire(self):
        for _ in range(15):
            obs, r, done, _ = self.env.step(self.env.action_space.sample())
            if done:
                return self.reset()
        return obs

    def step(self, action):
        traj = self.env.step(action)
        return traj

    def reset(self):
        self.env.reset()
        return self.fire()


class FrameStack(gym.ObservationWrapper):
    def __init__(self, env):
        super(FrameStack, self).__init__(env)
        self.queue = deque(maxlen=6)
        self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(6, 84, 84), dtype=np.float64)

    def observation(self, observation):
        self.queue.append(observation)
        obs_stack = np.zeros((6, 84, 84))
        for idx, element in enumerate(self.queue):
            obs_stack[idx] = element[0]
        return obs_stack

    def reset(self, **kwargs):
        self.queue.clear()
        obs = self.env.reset()
        obs_stack = np.zeros((6, 84, 84))
        obs_stack[0] = obs[0]
        return obs_stack


def wrap_env(env):
    return FrameStack(ResetEnv(ImageFormat(ActionSkip(env))))

model.py

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F


class NoisyLinear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True, noise_level=.017):
        super(NoisyLinear, self).__init__(in_features, out_features, bias=bias)
        self.sigma_weight = nn.Parameter(torch.full((out_features, in_features), noise_level))
        self.register_buffer("epsilon_weight", torch.zeros(out_features, in_features))

        if bias:
            self.sigma_bias = nn.Parameter(torch.full((out_features,), noise_level))
            self.register_buffer("epsilon_bias", torch.zeros(out_features))

    def forward(self, x):
        self.epsilon_weight.data.normal_(0., 1.)
        w_noise = self.epsilon_weight * self.sigma_weight
        weight = self.weight + w_noise

        if self.bias is not None:
            self.epsilon_bias.data.normal_(0., 1.)
            b_noise = self.epsilon_bias * self.sigma_bias
            bias = self.bias + b_noise
        else:
            bias = None

        return F.linear(x, weight, bias)


class ConvModel(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(ConvModel, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels=input_shape[0], out_channels=16, kernel_size=(8, 8), stride=(4, 4), padding=(0, 0)), nn.LeakyReLU(.2),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(4, 4), stride=(2, 2), padding=(0, 0)), nn.LeakyReLU(.2),
            nn.Flatten())

        conv_out_size = int(np.prod(self.conv(torch.zeros(1, *input_shape)).shape))

        self.fc = nn.Sequential(
            NoisyLinear(in_features=conv_out_size, out_features=256), nn.LeakyReLU(.2),
            NoisyLinear(in_features=256, out_features=n_actions))
    
    def forward(self, x):
        return self.fc(self.conv(x))

common.py

import collections
import multiprocessing

import numpy as np
import torch
import torch.nn.functional as F


Trajectory = collections.namedtuple("Trajectory", field_names=("step", "state", "action", "reward", "next_state",
                                                               "done", "info", "total_rewards"))
Episode = collections.namedtuple("Episode", field_names=("steps", "reward"))


class Agent(object):
    def __init__(self, net, action_spaces, epsilon, device):
        super(Agent, self).__init__()
        self.net = net
        self.action_spaces = action_spaces
        self.epsilon = epsilon
        self.device = device

    @torch.no_grad()
    def __call__(self, obs):
        if np.random.rand() > self.epsilon:
            obs_v = torch.FloatTensor(obs).to(self.device)
            net_out = self.net(obs_v).data.cpu().numpy()
            actions = np.argmax(net_out, axis=1).tolist()
        else:
            actions = [action_space.sample() for action_space in self.action_spaces]
        return actions


class ExpIterator(object):
    def __init__(self, envs, agent, gamma, n_step_unroll=1):
        super(ExpIterator, self).__init__()
        self.envs = envs
        self.agent = agent
        self.gamma = gamma
        self.n_step_unroll = n_step_unroll

        self.dones = [True for _ in envs]
        self.obs = [None for _ in envs]

        self.steps = [0. for _ in envs]
        self.episode_rewards = [0. for _ in envs]

        self.buffer = collections.deque(maxlen=n_step_unroll)

    def __call__(self):
        once = True

        while len(self.buffer) < self.n_step_unroll or once:
            once = False

            for d_idx, done in enumerate(self.dones):
                if done:
                    self.dones[d_idx] = False
                    self.obs[d_idx] = self.envs[d_idx].reset()
                    self.steps[d_idx] = 0.
                    self.episode_rewards[d_idx] = 0.

            actions = self.agent(np.stack(self.obs))
            trajs_raw = [env.step(action) for env, action in zip(self.envs, actions)]

            trajectories = []

            for t_idx, traj_raw in enumerate(trajs_raw):
                self.episode_rewards[t_idx] += traj_raw[1]
                self.steps[t_idx] += 1

                trajectories.append(Trajectory(step=self.steps[t_idx], state=self.obs[t_idx], action=actions[t_idx],
                                               reward=traj_raw[1], next_state=traj_raw[0], done=traj_raw[2],
                                               info=traj_raw[3], total_rewards=self.episode_rewards[t_idx]))

                self.dones[t_idx] = traj_raw[2]
                self.obs[t_idx] = traj_raw[0]

            self.buffer.append(trajectories)

        unroll_trajectories = []
        for traj_idx in range(len(self.envs)):
            unroll_reward = 0.
            for unroll_idx, unroll in enumerate(self.buffer):
                unroll_reward += unroll[traj_idx].reward*self.gamma**unroll_idx
                if unroll[traj_idx].done:
                    break

            unroll_trajectories.append(Trajectory(step=self.buffer[0][traj_idx].step, state=self.buffer[0][traj_idx].state,
                                                  action=self.buffer[0][traj_idx].action, reward=unroll_reward,
                                                  next_state=self.buffer[0][traj_idx].next_state, done=self.buffer[0][traj_idx].done,
                                                  info=self.buffer[0][traj_idx].info, total_rewards=self.buffer[0][traj_idx].total_rewards))
        return unroll_trajectories

    def __iter__(self):
        while True:
            yield self()


class Buffer(object):
    def __init__(self, buffer_capacity, exp_source):
        super(Buffer, self).__init__()
        self.core = collections.deque(maxlen=buffer_capacity)
        self.exp_source = exp_source

    def populate(self, n):
        for idx, trajectories in enumerate(self.exp_source):
            for trajectory in trajectories:
                self.core.append(trajectory)
            if idx + 1 >= n:
                break

    def sample(self, batch_size):
        indices = np.arange(len(self.core))
        np.random.shuffle(indices)
        return [self.core[idx] for idx in indices[:batch_size]]

    def iterate(self, initial, sample_interval, batch_size):
        self.populate(initial)
        while True:
            episodes = []
            for trajectory in list(self.core)[-self.exp_source.n_step_unroll:]:
                if trajectory.done:
                    episodes.append(Episode(steps=trajectory.step, reward=trajectory.total_rewards))

            yield self.sample(batch_size), episodes
            self.populate(sample_interval)


def calculate_loss(batch, net, tgt_net, gamma, device="cuda"):
    states, actions, rewards, next_states, dones = [], [], [], [], []

    for trajectory in batch:
        states.append(np.array(trajectory.state, dtype=np.float32, copy=False))
        actions.append(int(trajectory.action))
        rewards.append(float(trajectory.reward))
        next_states.append(np.array(trajectory.next_state, dtype=np.float32, copy=False))
        dones.append(bool(trajectory.done))

    states_v = torch.FloatTensor(np.array(states, dtype=np.float32, copy=False)).to(device)
    next_states_v = torch.FloatTensor(np.array(next_states, dtype=np.float32, copy=False)).to(device)
    rewards_v = torch.FloatTensor(rewards).to(device)

    with torch.no_grad():
        next_actions = net(next_states_v).data.cpu().numpy().argmax(axis=1)
        next_values_v = tgt_net(next_states_v)[range(next_states_v.shape[0]), next_actions]
        bellman_values = rewards_v + next_values_v*gamma

    values_predicted_v = net(states_v)[range(states_v.shape[0]), actions]
    return F.mse_loss(values_predicted_v, bellman_values)


def play_process(exp_source, queue):
    for exp in exp_source:
        queue.put(exp)

main.py:

import time

import model
import wrappers
import common

import torch
import torch.optim as optim
import torch.multiprocessing as mp

from tensorboardX import SummaryWriter
import gym


ENV_NAME = "PongNoFrameskip-v4"
DEVICE = torch.device("cuda")

N_ENVS = 3
BUFFER_CAPACITY = 10000
INITIAL_STEPS = 100
SAMPLE_INTERVAL = 1

N_STEP_UNROLL = 4
GAMMA = .99
BATCH_SIZE = 32*N_ENVS
LR = 1e-4

EPSILON_START = 1.
EPSILON_FINAL = .01
EPSILON_STEPS = 75000

TGT_NET_SYNC = 1000


if __name__ == "__main__":
    mp.set_start_method("spawn")

    envs = [wrappers.wrap_env(gym.make(ENV_NAME)) for _ in range(N_ENVS)]
    net = model.ConvModel(envs[0].observation_space.shape, envs[0].action_space.n).to(DEVICE)
    opt = optim.Adam(net.parameters(), lr=1e-4)
    tgt_net = model.ConvModel(envs[0].observation_space.shape, envs[0].action_space.n).to(DEVICE)
    agent = common.Agent(net, [env.action_space for env in envs], EPSILON_START, DEVICE)
    exp_source = common.ExpIterator(envs, agent, GAMMA, N_STEP_UNROLL)
    replay_buffer = common.Buffer(BUFFER_CAPACITY, exp_source)

    net.share_memory()
    tgt_net.share_memory()

    queue = mp.Queue(maxsize=8)
    writer = SummaryWriter(comment="PongDoubleNoisy")

    process = mp.Process(target=common.play_process, args=(exp_source, queue))
    process.start()

    b_idx = 0
    while True:
        while queue.qsize():
            start = time.time_ns()

            train_entry = queue.get()

            episodes = []
            for traj in train_entry:
                replay_buffer.core.append(traj)
                if traj.done:
                    episodes.append(common.Episode(steps=traj.step, reward=traj.total_rewards))
            batch = replay_buffer.sample(BATCH_SIZE*N_ENVS)

            if not b_idx % TGT_NET_SYNC:
                tgt_net.load_state_dict(net.state_dict())

            agent.epsilon = max(EPSILON_FINAL, agent.epsilon - SAMPLE_INTERVAL*(EPSILON_START - EPSILON_FINAL)/EPSILON_STEPS)

            opt.zero_grad()
            loss_v = common.calculate_loss(batch, net, tgt_net, GAMMA, DEVICE)
            loss_v.backward()
            opt.step()

            writer.add_scalar("loss", loss_v, b_idx)
            writer.add_scalar("epsilon", agent.epsilon, b_idx)
            writer.add_scalar("fps", 1e9/(time.time_ns()-start), b_idx)

            for episode in episodes:
                writer.add_scalar("reward", episode.reward, b_idx)
                writer.add_scalar("steps", episode.steps, b_idx)

                print(episode.reward, episode.steps)

            b_idx += 1