_thread.lock object cannot be pickled when using Pytorch multiprocessing package with spawn method

Hi!

I’m trying to start a multiprocessing task using PPO algorithm, it worked well when I was using TD3 algorithm but somehow it fails for PPO for the problem of _thread.lock() cannot be pickled UPON STARTING, i.e. it fails when the start() method is called.

import gymnasium as gym
import numpy as np
from AsyncPPO import Worker
from PPO_torch import PPO
import torch.multiprocessing as mp


NUM_WORKERS = 3
UPDATE_ITER = 3*32
MAX_STEPS = 10000
GAMMA = 0.99
TAU = 0.0002
EXP_EPS = 128*10


if __name__ == "__main__":
    env = gym.make('MountainCarContinuous-v0')
    obs_shape = env.observation_space.shape[0]
    prev_obs = np.zeros(obs_shape)
    action_shape = env.action_space.shape[0]

    global_agent = PPO(num_states=obs_shape, num_actions=action_shape, mp=True, critic_lr=0.0002, actor_lr=0.0002)
    global_agent.share_memory()

    success_counter = 0
    step_counter = 0

    workers = []
    mp.set_start_method("spawn")
    for i in range(NUM_WORKERS):
        worker = Worker(globalAC=global_agent, name=str(i))
        t = mp.Process(target=worker.work)
        t.start()

        workers.append(t)

    for thread in workers:
        if not thread.is_alive():
            thread.join()

And it gives the following error:

Traceback (most recent call last):
  File "/home/itca-control/rl_ws/mpc_ws/src/rl_network/gym_test/AsyncPPO_mountain_car.py", line 40, in <module>
    t.start()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
[W CudaIPCTypes.cpp:15] Producer process has been terminated before all shared CUDA tensors released. See Note [Sharing CUDA tensors]

I’m using Pytorch 2.0.1+cu117, Python 3.8
The worker class is as follow if it helps

class Worker(mp.Process):
    def __init__(self, globalAC: PPO, name: str,
                 env=None, render=None) -> None:
        """
        Initialize worker trainer
        """
        if env is not None:
            self.env = gym.make(env)
            if render is not None:
                gym.make(env, render_mode=render)

        else:
            self.env = gym.make('MountainCarContinuous-v0', render_mode="human")

        self.num_states = self.env.observation_space.shape[0]
        self.num_actions = self.env.action_space.shape[0]

        self.global_model = globalAC
        self.local_model = PPO(self.num_states, self.num_actions, critic_lr=0.0003, actor_lr=0.0002)
        self.name = name
        self.update_iter = self.local_model.buffer.buffer_capacity

    def work(self, max_steps=10, alow=-1, aup=1, exp_ep=1280):
        """
        Let the worker interact with the env and collect experience.

        The worker collects experience and adds to the master's replay buffer.
        """
        total_step = 0
        success_count = 0

        for ep in range(max_steps):
            obs, _ = self.env.reset()
            done = False
            ep_reward = 0.0

            while not done:
                if total_step > exp_ep:
                    action = self.local_model.sample_action(obs, a_lower=alow, a_upper=aup)
                    action = utils.to_numpy(action)
                else:
                    action = self.env.action_space.sample()

                obs, reward, done, _, _ = self.env.step(action)
                next_obs = np.squeeze(obs)
                self.local_model.buffer.record((obs, action, reward, next_obs, done))

                total_step += 1
                ep_reward += reward
                if total_step % self.update_iter == 0:
                    global_buffer = self.local_model.buffer.on_policy_collect()
                    self.global_model.update_from_buffer(global_buffer)

                    # Pull the update to local
                    self.local_model.actor.load_state_dict(self.global_model.actor.state_dict())
                    self.local_model.critic.load_state_dict(self.global_model.critic.state_dict())

                print("worker " + self.name + \
                      " Ep " + str(ep) + \
                      " Step " + str(total_step) + " reward " + str(200*reward))

                if reward >= 100:
                    done = True

            if done:
                print("Worker " + self.name + " training successful")
                break
1 Like

It turns out this problem occurs because I imported tensorboard and created SummaryWriter which for some reason messed up the thread lock object

1 Like

Hi, I am facing the same issue with tensorboard. Did you find any solution to this problem ?

In my case, I needed a SummaryWriter to log stuff. But it has a file handler, which makes parallelism impossible - see torch.save causes TypeError: can't pickle SwigPyObject objects · Issue #32046 · pytorch/pytorch · GitHub.

A simple fix I used was to instantiate a SummaryWriter everytime I logged, rather than have a single instance.

i.e. instead of

self.writer = SummaryWriter(...)

# somewhere in the code
self.writer.log(...)

I did:

self.log_dir = ...

# somewhere in the code
SummaryWriter(self.log_dir).log(...)