Hi!
I’m trying to start a multiprocessing task using PPO algorithm, it worked well when I was using TD3 algorithm but somehow it fails for PPO for the problem of _thread.lock() cannot be pickled UPON STARTING, i.e. it fails when the start() method is called.
import gymnasium as gym
import numpy as np
from AsyncPPO import Worker
from PPO_torch import PPO
import torch.multiprocessing as mp
NUM_WORKERS = 3
UPDATE_ITER = 3*32
MAX_STEPS = 10000
GAMMA = 0.99
TAU = 0.0002
EXP_EPS = 128*10
if __name__ == "__main__":
env = gym.make('MountainCarContinuous-v0')
obs_shape = env.observation_space.shape[0]
prev_obs = np.zeros(obs_shape)
action_shape = env.action_space.shape[0]
global_agent = PPO(num_states=obs_shape, num_actions=action_shape, mp=True, critic_lr=0.0002, actor_lr=0.0002)
global_agent.share_memory()
success_counter = 0
step_counter = 0
workers = []
mp.set_start_method("spawn")
for i in range(NUM_WORKERS):
worker = Worker(globalAC=global_agent, name=str(i))
t = mp.Process(target=worker.work)
t.start()
workers.append(t)
for thread in workers:
if not thread.is_alive():
thread.join()
And it gives the following error:
Traceback (most recent call last):
File "/home/itca-control/rl_ws/mpc_ws/src/rl_network/gym_test/AsyncPPO_mountain_car.py", line 40, in <module>
t.start()
File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
reduction.dump(process_obj, fp)
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
[W CudaIPCTypes.cpp:15] Producer process has been terminated before all shared CUDA tensors released. See Note [Sharing CUDA tensors]
I’m using Pytorch 2.0.1+cu117, Python 3.8
The worker class is as follow if it helps
class Worker(mp.Process):
def __init__(self, globalAC: PPO, name: str,
env=None, render=None) -> None:
"""
Initialize worker trainer
"""
if env is not None:
self.env = gym.make(env)
if render is not None:
gym.make(env, render_mode=render)
else:
self.env = gym.make('MountainCarContinuous-v0', render_mode="human")
self.num_states = self.env.observation_space.shape[0]
self.num_actions = self.env.action_space.shape[0]
self.global_model = globalAC
self.local_model = PPO(self.num_states, self.num_actions, critic_lr=0.0003, actor_lr=0.0002)
self.name = name
self.update_iter = self.local_model.buffer.buffer_capacity
def work(self, max_steps=10, alow=-1, aup=1, exp_ep=1280):
"""
Let the worker interact with the env and collect experience.
The worker collects experience and adds to the master's replay buffer.
"""
total_step = 0
success_count = 0
for ep in range(max_steps):
obs, _ = self.env.reset()
done = False
ep_reward = 0.0
while not done:
if total_step > exp_ep:
action = self.local_model.sample_action(obs, a_lower=alow, a_upper=aup)
action = utils.to_numpy(action)
else:
action = self.env.action_space.sample()
obs, reward, done, _, _ = self.env.step(action)
next_obs = np.squeeze(obs)
self.local_model.buffer.record((obs, action, reward, next_obs, done))
total_step += 1
ep_reward += reward
if total_step % self.update_iter == 0:
global_buffer = self.local_model.buffer.on_policy_collect()
self.global_model.update_from_buffer(global_buffer)
# Pull the update to local
self.local_model.actor.load_state_dict(self.global_model.actor.state_dict())
self.local_model.critic.load_state_dict(self.global_model.critic.state_dict())
print("worker " + self.name + \
" Ep " + str(ep) + \
" Step " + str(total_step) + " reward " + str(200*reward))
if reward >= 100:
done = True
if done:
print("Worker " + self.name + " training successful")
break