RPC does not seem to help in forward time

Using RPC only decreases “backward time”.

Hello I am using distributed.rpc changing the example in examples/main.py at 01539f9eada34aef67ae7b3d674f30634c35f468 · pytorch/examples · GitHub

I have N agents and 1 simulation. I would like to select actions in parallel, if possible in different GPU’s.

However, when I execute my code the only thing that seems to reduce is the update part. Below there is the code for selecting the actions which does not seem to be reduced at all, any ideas?

from torch.distributed.rpc import RRef, rpc_sync, rpc_async, remote

def _call_method(method, rref, *args, **kwargs):
    r"""
    a helper function to call a method on the given RRef
    """
    return method(rref.local_value(), *args, **kwargs)


def _remote_method(method, rref, *args, **kwargs):
    r"""
    a helper function to run method on the owner of rref and fetch back the
    result using RPC
    """
    args = [method, rref] + list(args)
    return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)

 def select_actions_all_agents(self, state, step):

        self.current_actions = np.zeros(self.current_actions.shape, dtype=np.int32) * -1000
        futs = []
        start_time = time.time()
        for ag_rreff in self.ag_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ag_rreff.owner(),
                    _call_method,
                    args=(Agent.select_action, ag_rreff, state, step)
                )
            )
            # if step % 50 == 0:
            #    print("Done", ag_rreff, time.time()-start_time)
        # wait until all obervers have finished this episode
        for fut in futs:
           fut.wait()

class Agent:
    def __init__(self):
        self.id = rpc.get_worker_info().id
        device = (self.id - 1) % args.num_gpus
        print("Initialasing Agent ID: {}, Device {}".format(self.id, device))
        self.device = ("cuda:" + str(device) if torch.cuda.is_available() else "cpu")
        self.rewards = []
        self.saved_log_probs = []
        # torch.manual_seed(args.seed+self.id)
        self.policy = Policy()
        self.policy.to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()

    def select_action(self, state, step):
        r"""
        This function is mostly borrowed from the Reinforcement Learning example.
        See https://github.com/pytorch/examples/tree/master/reinforcement_learning
        The main difference is that instead of keeping all probs in one list,
        the agent keeps probs in a dictionary, one key per observer.

        NB: no need to enforce thread-safety here as GIL will serialize
        executions.
        """
        start_time = time.time()
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        start_time_policy = time.time()
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs.append(m.log_prob(action))
        _remote_method(Master.report_action, master_rref, self.id,  action.item())

I am not very familiar with the example code here. Are self.ag_rrefs owned by different devices?

@mrshenli Any idea?

1 Like

Hello,

The code is a modification of the example.

self.ag_rrefs creates instances of “Agent”. If I have world-size 3 I will create 1 master and 2 Agents. Each agent will be assigned to a certain GPU.

What I would like to do is to have an agent per GPU and then the environment in the CPU.

What I would like to do is to have an agent per GPU and then the environment in the CPU.

                rpc_async(
                    ag_rreff.owner(),
                    _call_method,
                    args=(Agent.select_action, ag_rreff, state, step)
                )

To have each GPU dedicated to an agent, I think you can somehow create a mapping and replace ag_rreff.owner() by a cuda device here, even if the environment is in the CPU. Did you assign different agents to the same CPU (i.e., did you have the same value of ag_rreff.owner() for different agents)?

Another side-suggestion: you can also try TensorPipe RPC backend to accelerate the RPC performance over GPUs, by modifying rpc.init_rpc lines.

1 Like

did you have the same value of ag_rreff.owner() for different agents)?” - Yes, the owner is the same, the environment with the CPU.

However, when I create the agents I make them have their own GPU. (Also when I select the action I send the state to their own GPU)

class Agent:
    def __init__(self):
        self.id = rpc.get_worker_info().id
        device = (self.id - 1) % args.num_gpus
        print("Initialasing Agent ID: {}, Device {}".format(self.id, device))
        self.device = ("cuda:" + str(device) if torch.cuda.is_available() else "cpu")
        self.policy = Policy()
        self.policy.to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)

Does that help??

" I think you can somehow create a mapping and replace ag_rreff.owner() by a cuda device here, even if the environment is in the CPU" - How can I do that? I am a bit of newbie with RPC.

" Another side-suggestion: you can also try TensorPipe RPC backend to accelerate the RPC performance over GPUs, by modifying rpc.init_rpc lines." - I do not see any GPU options in TensorPipeRpcBackendOptions. Is there something I am missing here?

The first arg of rpc_async method specifies the device to run _call_method. Since you used CPU as the device (probably the same core), you actually didn’t parallelize the RPCs.

Can you show the code how you created ag_rrefs? I guess you probably just need to rewrite looping ag_rrefs in this way:

for i in range(len(self.ag_rrefs)):
    ag_rref  = self.ag_rrefs[i]
     # the worker name probably is "worker0" if you only have one machine.
    destination_worker = "{}/cuda:{}".format(worker_name, i) 
    futs.append(rpc_async(destination_worker, ...)

Additionally, unlike rref.local_value() used in _call_method, to move a rref value from a different device, you probably need to call rref.to_here() instead.

Regarding the usage of TensorPipe, please check out this example. To support GPU, you have to explicitly specify a device map.

First, thanks for your answers you are helping me very much.

Since you used CPU as the device (probably the same core), you actually didn’t parallelize the RPCs.” - Actually I just used a default gym environment which usually uses CPU, however If I would use an environment which uses a GPU, would that change things?

" Can you show the code how you created ag_rrefs ? I guess you probably just need to rewrite looping ag_rrefs in this way:" - Let me share all the code. ‘ag_rrefs’ is created in Master class init.

import argparse
import gym
import numpy as np
import os
from itertools import count

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_sync, rpc_async, remote
from torch.distributions import Categorical
import time

measure_inference = False
measure_select_action = True
TOTAL_EPISODE_STEP = 5000
MASTER_NAME = "Environment"
AGENT_NAME = "Agent{}"

parser = argparse.ArgumentParser(description='PyTorch RPC RL example')
parser.add_argument('--num-steps', type=int, default=200)
parser.add_argument('--num-gpus', type=int, default=torch.cuda.device_count())
parser.add_argument('--world-size', type=int, default=2, metavar='W',
                    help='world size for RPC, rank 0 is the agent, others are observers')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                    help='interval between training status logs (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)


def _call_method(method, rref, *args, **kwargs):
    r"""
    a helper function to call a method on the given RRef
    """
    return method(rref.local_value(), *args, **kwargs)


def _remote_method(method, rref, *args, **kwargs):
    r"""
    a helper function to run method on the owner of rref and fetch back the
    result using RPC
    """
    args = [method, rref] + list(args)
    return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)


class Policy(nn.Module):
    r"""
    Borrowing the ``Policy`` class from the Reinforcement Learning example.
    Copying the code to make these two examples independent.
    See https://github.com/pytorch/examples/tree/master/reinforcement_learning
    """
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 1024)
        self.affine6 = nn.Linear(1024, 2)
        self.hiddens_affine = nn.ModuleList([nn.Linear(1024, 1024) for _ in range(50)])

        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        if measure_inference:
            start_time_inferece = time.time()
        x = self.affine1(x)
        for i, l in enumerate(self.hiddens_affine):
            x = self.hiddens_affine[i](x)
            x = F.relu(x)
        action_scores = self.affine6(x)
        if measure_inference:
            print("Inference time", time.time()-start_time_inferece)
        return F.softmax(action_scores, dim=1)

class Master:
    def __init__(self, world_size):
        self.ag_rrefs = []
        self.master_rref = RRef(self)
        self.rewards = []
        self.saved_log_probs = []
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0

        for ag_rank in range(1, world_size):
            ag_info = rpc.get_worker_info(AGENT_NAME.format(ag_rank))
            # print("Ag_info", ag_info)
            self.ag_rrefs.append(remote(ag_info, Agent))

        self.current_actions = np.zeros(world_size-1, dtype=np.int32) * -1000
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold

        self.time_update = 0.
        self.num_time_update = 0.
        self.time_select_action = 0.
        self.num_time_select_action = 0.

    def select_actions_all_agents(self, state, step):

        self.current_actions = np.zeros(self.current_actions.shape, dtype=np.int32) * -1000
        futs = []
        start_time = time.time()
        for ag_rreff in self.ag_rrefs:
            # print(ag_rreff.owner())
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ag_rreff.owner(),
                    _call_method,
                    args=(Agent.select_action, ag_rreff, self.master_rref, state, step)
                )
            )
            # if step % 50 == 0:
            #    print("Done", ag_rreff, time.time()-start_time)
        # wait until all obervers have finished this episode
        for fut in futs:
           fut.wait()
        if measure_select_action and step % 50 == 0:
           print("Total time", time.time() - start_time)

        self.time_select_action += (time.time() - start_time)
        self.num_time_select_action += 1

        # mean of the actions
        # print(self.current_actions)
        assert (self.current_actions>-2).all()
        action = int(np.round(np.mean(self.current_actions)))
        # print("Action", action, "All actions", self.current_actions)
        return action

    def report_action(self, agent_id, action):
        self.current_actions[agent_id-1] = action
    """
    def report_reward_all_agents(self, reward):
        futs = []
        for ag_rreff in self.ag_rrefs:
            # report the reward to the agent for training purpose
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    _call_method,
                    args=(Agent.report_reward, ag_rreff, self.agent_rref, reward)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()
    """
    def update_all_agents(self):

        # calculate the running reward
        self.sum_rewards = sum(self.rewards)
        self.running_reward = 0.05 * self.sum_rewards + (1 - 0.05) * self.running_reward

        start_time = time.time()
        futs = []
        for ag_rreff in self.ag_rrefs:
            # print("ag_rreff", ag_rreff, "owner", ag_rreff.owner())
            futs.append(
                rpc_async(
                    ag_rreff.owner(),
                    _call_method,
                    args=(Agent.finish_episode, ag_rreff, self.rewards)
                )
            )

        for fut in futs:
            fut.wait()
        self.time_update += (time.time() - start_time)
        self.num_time_update += 1

        self.rewards = []

    def run_episode(self, n_steps):
        r"""
        Run one episode of n_steps.

        Arguments:
            n_steps (int): number of steps in this episode
        """

        state, ep_reward = self.env.reset(), 0
        for step in range(n_steps):
            # send the state to the agents to get an action
            action = self.select_actions_all_agents(state, step)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            self.rewards.append(reward)

            # if done:
            #    break

        self.update_all_agents()

        return self.sum_rewards, step


class Agent:
    def __init__(self):
        self.id = rpc.get_worker_info().id
        device = (self.id - 1) % args.num_gpus
        print("Initialasing Agent ID: {}, Device {}".format(self.id, device))
        self.device = ("cuda:" + str(device) if torch.cuda.is_available() else "cpu")
        self.rewards = []
        self.saved_log_probs = []
        # torch.manual_seed(args.seed+self.id)
        self.policy = Policy()
        self.policy.to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()


    def select_action(self, master_rref, state, step):
        r"""
        This function is mostly borrowed from the Reinforcement Learning example.
        See https://github.com/pytorch/examples/tree/master/reinforcement_learning
        The main difference is that instead of keeping all probs in one list,
        the agent keeps probs in a dictionary, one key per observer.

        NB: no need to enforce thread-safety here as GIL will serialize
        executions.
        """
        start_time = time.time()
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        start_time_policy = time.time()
        probs = self.policy(state)
        # if step % 50 == 0:
        #    print(state)
        # if step % 50 == 0:
        #    print("Id{} Step {} Time for policy{}".format(self.id, step, time.time()-start_time_policy))
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs.append(m.log_prob(action))
        # print(master_rref, self.id,  action.item())
        # if step % 50 == 0:
        #    print("Agent {} Time select action {}".format(self.id, time.time() - start_time))
        _remote_method(Master.report_action, master_rref, self.id,  action.item())

    def finish_episode(self, saved_rewards):
        r"""
        This function is mostly borrowed from the Reinforcement Learning example.
        See https://github.com/pytorch/examples/tree/master/reinforcement_learning
        The main difference is that it joins all probs and rewards from
        different observers into one list, and uses the minimum observer rewards
        as the reward of the current episode.
        """

        # joins probs and rewards from different observers into lists
        # print("SAVED_REWARDS", saved_rewards)
        R, probs, rewards = 0, self.saved_log_probs, saved_rewards

        policy_loss, returns = [], []
        for r in rewards[::-1]:
            R = r + args.gamma * R
            returns.insert(0, R)

        # print("RETURNS BEFORE TORCH", returns)

        returns = torch.tensor(returns).to(self.device)
        returns = (returns - returns.mean()) / (returns.std() + self.eps)
        for log_prob, R in zip(probs, returns):
            policy_loss.append(-log_prob * R)
        self.optimizer.zero_grad()
        policy_loss = torch.cat(policy_loss).sum()
        # print("POLICY LOSS", policy_loss)

        # print("Saved log probs {}, saved rewards {}".format(len(self.saved_log_probs), len(saved_rewards)))
        start_time = time.time()
        policy_loss.backward()
        self.optimizer.step()
        # print("End time {} id {}".format(time.time()-start_time, self.id))

        # clear saved probs
        self.saved_log_probs = []


def report_statistics(master, i_episode, start_time, total_steps):
    if i_episode % args.log_interval == 0:
        print('Ep {}\tAvg r: {:.2f}\t Avg t(1000): {:.3f}'
              '\tAvg t updt: {:.4f}\tAvg t select act: {:.4f}'.format(
            i_episode,
            master.running_reward,
            1000 * (time.time() - start_time) / total_steps,
            master.time_update / master.num_time_update,
            master.time_select_action / master.num_time_select_action))
        start_time = time.time()
        total_steps = 0
        master.time_update = 0.
        master.num_time_update = 0.
        master.time_select_action = 0.
        master.num_time_select_action = 0.

    return start_time, total_steps


def run_worker(rank, world_size):
    r"""
    This is the entry point for all processes. The rank 0 is the agent. All
    other ranks are observers.
    """
    # print("RANK", rank)
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(MASTER_NAME, rank=rank, world_size=world_size)

        master = Master(world_size)
        start_time = time.time()
        total_steps = 0
        for i_episode in count(1):
            n_steps = args.num_steps # int(TOTAL_EPISODE_STEP / (args.world_size - 1))
            last_reward, steps = master.run_episode(n_steps=n_steps)

            total_steps += steps

            start_time, total_steps = report_statistics(master, i_episode, start_time, total_steps)
            if master.running_reward > master.reward_threshold or i_episode >= 10:
                print("Solved! Running reward is now {}!".format(master.running_reward))
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(AGENT_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


def main():
    print("Number of GPU available", torch.cuda.device_count())
    mp.spawn(
        run_worker,
        args=(args.world_size, ),
        nprocs=args.world_size,
        join=True
    )


if __name__ == '__main__':
    main()

When I try to use the example you proposed I get the following error: (Note the workers are called Agent0, Agent1, etc)

Unknown destination worker Agent0/cuda:0

" Additionally, unlike rref.local_value() used in _call_method , to move a rref value from a different device, you probably need to call rref.to_here() instead."

Using to here in the initial code does not seem to improve performance.

Unknown destination worker Agent0/cuda:0

After taking a look at the API, I think rpc_async can only accept str or WorkerInfo as the destination worker, so here AGENT_NAME.format(ag_rank) like Agent0 should meet the syntax requirement here.