I am doing RL where episodes are run in parallel using multiprocessing. Here is some minimal pseudo-code that reflects the workflow, where optim.step()
is called in each subprocess:
def episode_runner(in_q, out_q, optim, model):
env = MyEnv()
while episode_data := in_q.get():
trajectory = run_episode(env, episode_data, model)
optim.zero_grad()
loss = compute_loss(trajectory)
loss.backward()
optim.step()
out_q.put(loss.item())
def launch_subprocesses(n_procs, optim, model):
procs = []
in_qs = []
out_qs = []
for i in range(n_procs):
in_q = SimpleQueue()
in_qs += [in_q]
out_q = SimpleQueue()
out_qs += [out_q]
proc = Process(
target=episode_runner,
args=(in_q, out_q, optim, model))
proc.start()
return procs, in_qs, out_qs
def terminate_subprocesses(in_qs, procs):
for in_q in in_qs:
in_q.put(None)
for proc in procs:
proc.join()
def training_loop(n_procs, optim, model, n_epochs):
procs, in_qs, out_qs = \
launch_subprocesses(n_procs, optim, model)
for epoch in range(n_epochs):
episode_data = generate_data()
for in_q in in_qs:
in_q.put(episode_data)
losses = [out_q.get() for out_q in out_qs]
terminate_subprocesses(in_qs, procs)
Alternatively, rather than calling optim.step()
within each subprocess, I can call it once in the main process after letting the gradients accumulate:
def episode_runner(in_q, out_q, model):
env = MyEnv()
while episode_data := in_q.get():
trajectory = run_episode(env, episode_data, model)
loss = compute_loss(trajectory)
loss.backward()
out_q.put(loss.item())
def training_loop(n_procs, optim, model, n_epochs):
procs, in_qs, out_qs = \
launch_subprocesses(n_procs, model)
for epoch in range(n_epochs):
episode_data = generate_data()
for in_q in in_qs:
in_q.put(episode_data)
optim.zero_grad()
losses = [out_q.get() for out_q in out_qs]
optim.step()
terminate_subprocesses(in_qs, procs)
My model is learning properly with either approach, and so I wonder: which one should I prefer? Does it make any difference?