First, thanks for your answers you are helping me very much.
“Since you used CPU as the device (probably the same core), you actually didn’t parallelize the RPCs.” - Actually I just used a default gym environment which usually uses CPU, however If I would use an environment which uses a GPU, would that change things?
" Can you show the code how you created ag_rrefs
? I guess you probably just need to rewrite looping ag_rrefs
in this way:" - Let me share all the code. ‘ag_rrefs’ is created in Master class init.
import argparse
import gym
import numpy as np
import os
from itertools import count
import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_sync, rpc_async, remote
from torch.distributions import Categorical
import time
measure_inference = False
measure_select_action = True
TOTAL_EPISODE_STEP = 5000
MASTER_NAME = "Environment"
AGENT_NAME = "Agent{}"
parser = argparse.ArgumentParser(description='PyTorch RPC RL example')
parser.add_argument('--num-steps', type=int, default=200)
parser.add_argument('--num-gpus', type=int, default=torch.cuda.device_count())
parser.add_argument('--world-size', type=int, default=2, metavar='W',
help='world size for RPC, rank 0 is the agent, others are observers')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='discount factor (default: 0.99)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
help='random seed (default: 543)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='interval between training status logs (default: 10)')
args = parser.parse_args()
torch.manual_seed(args.seed)
def _call_method(method, rref, *args, **kwargs):
r"""
a helper function to call a method on the given RRef
"""
return method(rref.local_value(), *args, **kwargs)
def _remote_method(method, rref, *args, **kwargs):
r"""
a helper function to run method on the owner of rref and fetch back the
result using RPC
"""
args = [method, rref] + list(args)
return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)
class Policy(nn.Module):
r"""
Borrowing the ``Policy`` class from the Reinforcement Learning example.
Copying the code to make these two examples independent.
See https://github.com/pytorch/examples/tree/master/reinforcement_learning
"""
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 1024)
self.affine6 = nn.Linear(1024, 2)
self.hiddens_affine = nn.ModuleList([nn.Linear(1024, 1024) for _ in range(50)])
self.saved_log_probs = []
self.rewards = []
def forward(self, x):
if measure_inference:
start_time_inferece = time.time()
x = self.affine1(x)
for i, l in enumerate(self.hiddens_affine):
x = self.hiddens_affine[i](x)
x = F.relu(x)
action_scores = self.affine6(x)
if measure_inference:
print("Inference time", time.time()-start_time_inferece)
return F.softmax(action_scores, dim=1)
class Master:
def __init__(self, world_size):
self.ag_rrefs = []
self.master_rref = RRef(self)
self.rewards = []
self.saved_log_probs = []
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
for ag_rank in range(1, world_size):
ag_info = rpc.get_worker_info(AGENT_NAME.format(ag_rank))
# print("Ag_info", ag_info)
self.ag_rrefs.append(remote(ag_info, Agent))
self.current_actions = np.zeros(world_size-1, dtype=np.int32) * -1000
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
self.time_update = 0.
self.num_time_update = 0.
self.time_select_action = 0.
self.num_time_select_action = 0.
def select_actions_all_agents(self, state, step):
self.current_actions = np.zeros(self.current_actions.shape, dtype=np.int32) * -1000
futs = []
start_time = time.time()
for ag_rreff in self.ag_rrefs:
# print(ag_rreff.owner())
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ag_rreff.owner(),
_call_method,
args=(Agent.select_action, ag_rreff, self.master_rref, state, step)
)
)
# if step % 50 == 0:
# print("Done", ag_rreff, time.time()-start_time)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
if measure_select_action and step % 50 == 0:
print("Total time", time.time() - start_time)
self.time_select_action += (time.time() - start_time)
self.num_time_select_action += 1
# mean of the actions
# print(self.current_actions)
assert (self.current_actions>-2).all()
action = int(np.round(np.mean(self.current_actions)))
# print("Action", action, "All actions", self.current_actions)
return action
def report_action(self, agent_id, action):
self.current_actions[agent_id-1] = action
"""
def report_reward_all_agents(self, reward):
futs = []
for ag_rreff in self.ag_rrefs:
# report the reward to the agent for training purpose
futs.append(
rpc_async(
ob_rref.owner(),
_call_method,
args=(Agent.report_reward, ag_rreff, self.agent_rref, reward)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
"""
def update_all_agents(self):
# calculate the running reward
self.sum_rewards = sum(self.rewards)
self.running_reward = 0.05 * self.sum_rewards + (1 - 0.05) * self.running_reward
start_time = time.time()
futs = []
for ag_rreff in self.ag_rrefs:
# print("ag_rreff", ag_rreff, "owner", ag_rreff.owner())
futs.append(
rpc_async(
ag_rreff.owner(),
_call_method,
args=(Agent.finish_episode, ag_rreff, self.rewards)
)
)
for fut in futs:
fut.wait()
self.time_update += (time.time() - start_time)
self.num_time_update += 1
self.rewards = []
def run_episode(self, n_steps):
r"""
Run one episode of n_steps.
Arguments:
n_steps (int): number of steps in this episode
"""
state, ep_reward = self.env.reset(), 0
for step in range(n_steps):
# send the state to the agents to get an action
action = self.select_actions_all_agents(state, step)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
self.rewards.append(reward)
# if done:
# break
self.update_all_agents()
return self.sum_rewards, step
class Agent:
def __init__(self):
self.id = rpc.get_worker_info().id
device = (self.id - 1) % args.num_gpus
print("Initialasing Agent ID: {}, Device {}".format(self.id, device))
self.device = ("cuda:" + str(device) if torch.cuda.is_available() else "cpu")
self.rewards = []
self.saved_log_probs = []
# torch.manual_seed(args.seed+self.id)
self.policy = Policy()
self.policy.to(self.device)
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
def select_action(self, master_rref, state, step):
r"""
This function is mostly borrowed from the Reinforcement Learning example.
See https://github.com/pytorch/examples/tree/master/reinforcement_learning
The main difference is that instead of keeping all probs in one list,
the agent keeps probs in a dictionary, one key per observer.
NB: no need to enforce thread-safety here as GIL will serialize
executions.
"""
start_time = time.time()
state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
start_time_policy = time.time()
probs = self.policy(state)
# if step % 50 == 0:
# print(state)
# if step % 50 == 0:
# print("Id{} Step {} Time for policy{}".format(self.id, step, time.time()-start_time_policy))
m = Categorical(probs)
action = m.sample()
self.saved_log_probs.append(m.log_prob(action))
# print(master_rref, self.id, action.item())
# if step % 50 == 0:
# print("Agent {} Time select action {}".format(self.id, time.time() - start_time))
_remote_method(Master.report_action, master_rref, self.id, action.item())
def finish_episode(self, saved_rewards):
r"""
This function is mostly borrowed from the Reinforcement Learning example.
See https://github.com/pytorch/examples/tree/master/reinforcement_learning
The main difference is that it joins all probs and rewards from
different observers into one list, and uses the minimum observer rewards
as the reward of the current episode.
"""
# joins probs and rewards from different observers into lists
# print("SAVED_REWARDS", saved_rewards)
R, probs, rewards = 0, self.saved_log_probs, saved_rewards
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
# print("RETURNS BEFORE TORCH", returns)
returns = torch.tensor(returns).to(self.device)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
# print("POLICY LOSS", policy_loss)
# print("Saved log probs {}, saved rewards {}".format(len(self.saved_log_probs), len(saved_rewards)))
start_time = time.time()
policy_loss.backward()
self.optimizer.step()
# print("End time {} id {}".format(time.time()-start_time, self.id))
# clear saved probs
self.saved_log_probs = []
def report_statistics(master, i_episode, start_time, total_steps):
if i_episode % args.log_interval == 0:
print('Ep {}\tAvg r: {:.2f}\t Avg t(1000): {:.3f}'
'\tAvg t updt: {:.4f}\tAvg t select act: {:.4f}'.format(
i_episode,
master.running_reward,
1000 * (time.time() - start_time) / total_steps,
master.time_update / master.num_time_update,
master.time_select_action / master.num_time_select_action))
start_time = time.time()
total_steps = 0
master.time_update = 0.
master.num_time_update = 0.
master.time_select_action = 0.
master.num_time_select_action = 0.
return start_time, total_steps
def run_worker(rank, world_size):
r"""
This is the entry point for all processes. The rank 0 is the agent. All
other ranks are observers.
"""
# print("RANK", rank)
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(MASTER_NAME, rank=rank, world_size=world_size)
master = Master(world_size)
start_time = time.time()
total_steps = 0
for i_episode in count(1):
n_steps = args.num_steps # int(TOTAL_EPISODE_STEP / (args.world_size - 1))
last_reward, steps = master.run_episode(n_steps=n_steps)
total_steps += steps
start_time, total_steps = report_statistics(master, i_episode, start_time, total_steps)
if master.running_reward > master.reward_threshold or i_episode >= 10:
print("Solved! Running reward is now {}!".format(master.running_reward))
break
else:
# other ranks are the observer
rpc.init_rpc(AGENT_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from agents
rpc.shutdown()
def main():
print("Number of GPU available", torch.cuda.device_count())
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
if __name__ == '__main__':
main()
When I try to use the example you proposed I get the following error: (Note the workers are called Agent0, Agent1, etc)
Unknown destination worker Agent0/cuda:0
" Additionally, unlike rref.local_value()
used in _call_method
, to move a rref value from a different device, you probably need to call rref.to_here()
instead."
Using to here in the initial code does not seem to improve performance.