DDP with custom logger

Hi everyone,

I am getting the error below when running DDP with a custom logger class.

Starting process 0
Starting process 1
Error executing job with overrides: []
Traceback (most recent call last):
  File "/home/mazen/Projects/ttorch/src/train.py", line 68, in main
    trainer.fit()
  File "/home/mazen/Projects/ttorch/src/trainer.py", line 280, in fit
    p.start()
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/home/mazen/miniconda3/envs/jdt/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

I am trying to build a Trainer class that has a custom Logger class so I can utilize different logging methods, such as log to file, WandB, TensorBoard, etc. I have developed a workable example where I can have the logger work with DDP. Basically, I am queuing the logs from each process to be piped to the main process which does all of the logging. Here is the code: Example for handling multiprocess logging when using `torch.distributed` · GitHub

I have used what the referenced code above did, but I have added it to its own class, Logger class, and I have another class for the Trainer class. I will share relevant code and remove irrelevant code:

logger.py

class WorkerLogFilter(Filter):
    def __init__(self, rank=-1):
        super().__init__()
        self._rank = rank

    def filter(self, record):
        if self._rank != -1:
            record.msg = f"Rank {self._rank} | {record.msg}"
        return True
class Logger(object):
    def __init__(self, project, exp_name,save_dir,kwargs={},distributed=False,enable_wandb=False):
        ...
        self.distributed = distributed
        self.log_queue = Queue(-1)
        self.logger = logging.getLogger(__name__)
        self.format = "%(message)s"
        self.datafmt = "[%X]"
        self.set_handlers()
        if self.distributed:
            self.listener = QueueListener(self.log_queue, self.handlers[0], self.handlers[1], respect_handler_level=True)
            self.listener.start()
        else:
            logging.basicConfig(
                format=self.format, 
                datefmt=self.datafmt, 
                handlers=self.handlers
            )
        ...
    def set_handlers(self):
        info_handler = logging.FileHandler("info.log",mode="w")
        debug_handler = logging.FileHandler("debug.log",mode="w")
        info_handler.setLevel(logging.INFO)
        debug_handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')  
        info_handler.setFormatter(formatter)
        debug_handler.setFormatter(formatter)
        self.handlers = [info_handler, debug_handler]  
        ...

trainer.py

class Trainer(object):
    def __init__(self, module, dataset, distributed,**kwargs):
        self.module = module
        self.dataset = dataset
        self.kwargs = copy.deepcopy(kwargs)
        self.trainer_kwargs = self.kwargs['trainer_kwargs']
        self.distributed = distributed
        self.logger = Logger(
            self.kwargs['project'],
            self.kwargs['exp_name'],
            self.kwargs['save_dir'],
            self.kwargs,
            distributed
        )
        self.seed = self.trainer_kwargs['seed'] if 'seed' in self.trainer_kwargs.keys() else random.randint(1, 10000)
        self.history = {'train': {'loss': [], 'correct': []}, 'val': {'loss': [], 'correct': []}}
        self.epoch_history = {'train': {'loss': [], 'acc': []}, 'val': {'loss': [], 'acc': []}}

    def init(self,device):
        torch.backends.cudnn.enabled = False
        torch.manual_seed(self.seed)
        torch.cuda.manual_seed(self.seed)
        torch.cuda.set_device(device)
    ...
    def _fit(self):
        module = copy.deepcopy(self.module)
        dataset = copy.deepcopy(self.dataset)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        if self.distributed: 
            device = torch.device(f"cuda:{dist.get_rank()}")
            self.init(dist.get_rank())
        else: self.init(0)
        module.model = module.model.to(device)
        if self.distributed:
            module.model = DDP(module.model,device_ids=[dist.get_rank()],output_device=dist.get_rank())
            dataset.dist_sampler()
        module.init_optimizer()
        self.before_fit(dataset)
        for epoch in self.epoch_loop:
            self.start_epoch(epoch)
            self.step('train', module, dataset, self.step_loop, self.epoch_loop, epoch,device)
            self.step('val', module, dataset, self.step_loop, self.epoch_loop, epoch,device)
            self.end_epoch(module,epoch)
    def setup_worker_logging(self,rank):
        queue_handler = QueueHandler(self.logger.log_queue)

        worker_filter = WorkerLogFilter(rank)
        queue_handler.addFilter(worker_filter)

        root_logger = logging.getLogger()
        root_logger.addHandler(queue_handler)
    def init_process(
        self,
        world_size, # number of workers
        backend='nccl' # the best for CUDA
    ):
        rank = dist.get_rank()
        self.setup_worker_logging(rank)
        torch.cuda.set_device(rank)
        os.environ['MASTER_ADDR'] = self.trainer_kwargs['ip']
        os.environ['MASTER_PORT'] = self.trainer_kwargs['port']
        dist.init_process_group(backend, rank=rank, world_size=world_size)
        dist.barrier()
        self._fit()
    def fit(self):
        if self.distributed:
            world_size = len(self.trainer_kwargs['gpus'])
            processes = []
            try:
                for rank in range(world_size):
                    print(f"Starting process {rank}")
                    p = mp.Process(target=self.init_process, args=(world_size,))
                    processes.append(p)
                for p in processes:
                    p.start()
                for p in processes:
                    p.join()
            except RuntimeError:
                print("ERROR: unable to start multiple processes")
        else:
            self._fit()

train.py

@hydra.main(config_path="cfg", config_name="config")
def main(cfg : DictConfig) -> None:
    config = OmegaConf.to_yaml(cfg)
    root_dir = cfg.project.root_dir
    distributed = len(cfg.exp.gpus)>1 if 'gpus' in cfg.exp.keys() else False
    if distributed: torch.multiprocessing.set_start_method("spawn", force=True)
    dataset_kwargs = {...}
    dataset = Dataset(...)
    module_kwargs = {...}
    module = Module(...)
    trainer_kwargs = {...}
    exp_kwargs = {...}
    trainer = Trainer(module,dataset,distributed=distributed,**exp_kwargs)
    trainer.fit()
if __name__ == "__main__":
    main()

If you want me to give you the entire code, let me know. Thanks!

Thanks for posting your question. From the error, it looks like you were “Trying to run two different functions at the same time with shared queue and get an error”. Do we have any general guideline for customized logger @rvarm1 ?

Update: Looks like this is a common issue regarding multiple process not distributed specific. So you might want to check why you are having the reason mentioned above. Thanks!

1 Like