Elastic agents cannot properly shutdown/restart after failure

I am trying to run a distributed training using the Elastic Job Controller (from here: elastic/kubernetes at master · pytorch/elastic · GitHub) and while testing the distributed agent does not seem to be able to shutdown themselves properly. Here is a stacktrace of what happens, eventually, when one of the pods die:

[E ProcessGroupGloo.cpp:136] Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
Error executing job with overrides: [‘learning_rate=single’, ‘model.architecture=efflitenet40’, ‘dataset.val.dataset_dpath=/data/wild_v4_tiny_fr_val’, ‘optimization.batch_size=256’, ‘backend.num_nodes=4’, ‘experiment_id=yoanis-autoresume-01’, ‘logging.ckpt_nsteps=500’, ‘logging.run_id=c61a45bf-6a05-4435-944b-ecc895b75a19’, ‘backend.num_nodes=4’, ‘resume_training.auto_resume=True’, ‘backend.wandb_group=multinode-c61a45bf-6a05-4435-944b-ecc895b75a19’, ‘backend.num_cores=1’]
Traceback (most recent call last):
File “pl_trainer.py”, line 422, in main
trainer.fit(pl_model, pl_data)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 768, in fit
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 721, in _call_and_handle_interrupt
return trainer_fn(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 809, in _fit_impl
results = self._run(model, ckpt_path=self.ckpt_path)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 1234, in _run
results = self._run_stage()
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 1321, in _run_stage
return self._run_train()
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 1351, in _run_train
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/base.py”, line 204, in run
self.advance(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/fit_loop.py”, line 269, in advance
self._outputs = self.epoch_loop.run(self._data_fetcher)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/base.py”, line 204, in run
self.advance(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/epoch/training_epoch_loop.py”, line 208, in advance
batch_output = self.batch_loop.run(batch, batch_idx)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/base.py”, line 204, in run
self.advance(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/batch/training_batch_loop.py”, line 88, in advance
outputs = self.optimizer_loop.run(split_batch, optimizers, batch_idx)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/base.py”, line 204, in run
self.advance(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 203, in advance
result = self._run_optimization(
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 256, in _run_optimization
self._optimizer_step(optimizer, opt_idx, batch_idx, closure)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 369, in _optimizer_step
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 1593, in _call_lightning_module_hook
output = fn(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/core/lightning.py”, line 1625, in optimizer_step
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/core/optimizer.py”, line 168, in step
step_output = self._strategy.optimizer_step(self._optimizer, self._optimizer_idx, closure, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/strategies/ddp.py”, line 278, in optimizer_step
optimizer_output = super().optimizer_step(optimizer, opt_idx, closure, model, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/strategies/strategy.py”, line 193, in optimizer_step
return self.precision_plugin.optimizer_step(model, optimizer, opt_idx, closure, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/plugins/precision/native_amp.py”, line 85, in optimizer_step
closure_result = closure()
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 148, in call
self._result = self.closure(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 134, in closure
step_output = self._step_fn()
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/loops/optimization/optimizer_loop.py”, line 427, in _training_step
training_step_output = self.trainer._call_strategy_hook(“training_step”, *step_kwargs.values())
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/trainer/trainer.py”, line 1763, in _call_strategy_hook
output = fn(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/pytorch_lightning/strategies/ddp.py”, line 341, in training_step
return self.model(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/torch/nn/modules/module.py”, line 1110, in _call_impl
return forward_call(*input, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/torch/nn/parallel/distributed.py”, line 955, in forward
File “/usr/local/lib/python3.8/dist-packages/torch/nn/parallel/distributed.py”, line 1602, in _sync_buffers
File “/usr/local/lib/python3.8/dist-packages/torch/nn/parallel/distributed.py”, line 1606, in _sync_module_buffers
File “/usr/local/lib/python3.8/dist-packages/torch/nn/parallel/distributed.py”, line 1627, in _default_broadcast_coalesced
File “/usr/local/lib/python3.8/dist-packages/torch/nn/parallel/distributed.py”, line 1543, in _distributed_broadcast_coalesced
RuntimeError: Rank 1 successfully reached monitoredBarrier, but received errors while waiting for send/recv from rank 0. Please check rank 0 logs for faulty rank.
Original exception:
[…/third_party/gloo/gloo/transport/tcp/pair.cc:598] Connection closed by peer []:40678

Set the environment variable HYDRA_FULL_ERROR=1 for a complete stack trace.
wandb: Waiting for W&B process to finish… (failed 1). Press Control-C to abort syncing.
wandb: Synced playful-jazz-804: _____
wandb: Synced 5 W&B file(s), 0 media file(s), 0 artifact file(s) and 0 other file(s)
wandb: Find logs at: ./wandb/run-20220421_193319-ny9ee8dv/logs

ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: 1) local_rank: 0 (pid: 421) of binary: /usr/bin/python
INFO:torch.distributed.elastic.agent.server.api:Local worker group finished (FAILED). Waiting 300 seconds for other agents to finish
ERROR:torch.distributed.elastic.agent.server.api:Error waiting on exit barrier. Elapsed: 300.1109097003937 seconds
Traceback (most recent call last):
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/agent/server/api.py”, line 906, in _exit_barrier
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/utils/store.py”, line 67, in barrier
synchronize(store, data, rank, world_size, key_prefix, barrier_timeout)
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/utils/store.py”, line 53, in synchronize
agent_data = get_all(store, key_prefix, world_size)
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/utils/store.py”, line 31, in get_all
data = store.get(f"{prefix}{idx}")
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/rendezvous/etcd_store.py”, line 75, in get
raise LookupError(f"Key {key} not found in EtcdStore")
LookupError: Key torchelastic/agent/terminal_state3 not found in EtcdStore
INFO:torch.distributed.elastic.multiprocessing.errors:local_rank 1 FAILED with no error file. Decorate your entrypoint fn with @record for traceback info. See: _____
Traceback (most recent call last):
File “/usr/lib/python3.8/runpy.py”, line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File “/usr/lib/python3.8/runpy.py”, line 87, in _run_code
exec(code, run_globals)
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/run.py”, line 728, in
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/elastic/multiprocessing/errors/init.py”, line 345, in wrapper
return f(*args, **kwargs)
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/run.py”, line 724, in main
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/run.py”, line 715, in run
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/launcher/api.py”, line 131, in call
return launch_agent(self._config, self._entrypoint, list(args))
File “/usr/local/lib/python3.8/dist-packages/torch/distributed/launcher/api.py”, line 245, in launch_agent
raise ChildFailedError(

pl_trainer.py FAILED


Root Cause (first observed failure):
time : 2022-04-21_19:48:31
host : job-c61a45bf-6a05-4435-944b-ecc895b75a19-worker-0
rank : 1 (local_rank: 0)
exitcode : 1 (pid: 421)
error_file: <N/A>

I know this is a very complex issue and more information could be required, but wantet to reach out to the broader community to see if anyone has ran into a similar issue.

@Kiuk_Chung might help you further.

From the stack trace it looks like there was a legitimate error on one of the nodes which doesn’t make the workers on other nodes fail (but rather they succeed early). What you are observing is that on the node where the workers who (falsely) finish successfully, the elastic agent will wait for 5min by default for other workers on other nodes to finish and eventually times out b/c the other workers don’t.

Its hard to say what the exact root cause of the error is without looking at the error summary (the last few lines of the log file, those lines that start with Root Cause (first observed failure)...) of all the nodes.

I suggest a few things:

  1. torchelastic has been upstreamed to pytorch and the k8s CRD on the elastic repo is now no longer being actively maintained. You can try launching DDP jobs with TorchX. Here’s the instructions for that
    1. Setting up on Kubernetes: Kubernetes — PyTorch/TorchX main documentation.
    2. Launching distributed jobs: Distributed — PyTorch/TorchX main documentation
  2. Turn on error reporting by annotating your trainer main method with torch.distributed.elastic.multiprocessing.errors.record (follow instructions here: Error Propagation — PyTorch 1.11.0 documentation). The error summary table should’ve prompted you to do this as well.
  3. Reply to this with the exact commands and parameters you ran with (num nodes, num procs per node, your training script etc)
1 Like

@Kiuk_Chung thanks for the reply!

To give you a bit more of background, I was running a distributed run with the Elastic Job Controller and ensuring that a training job would be able to resume after one of the pods died, so I was essentially just doing kubectl delete pod POD_ID. The behavior I was observing is that after killing one of the pods, the training job would not resume. But I then realized that I forgot to set --max-restarts to a value higher than 1. Once I set --max-restarts to 2 or more, everything started working as I expected (training job would resume in the case of a pod or VM dying).

On a side topic, and if I get it right, it seems the official replacement to the TorchElastic Controller is torchx? If that’s the case then it would be really nice to include that reference here and make it more explicit here. We’re using the TorchElastic Controller at the moment and I was just worried that we were using a no longer maintained solution but at the same time, up until now, I was not able to find an official replacement. Thanks for letting me know :)!

Glad you figured this one out! and thanks for the suggestion about updating the docs in TorchElastic to point to TorchX, just submitted a PR to update the README. (torchelastic) update README to point elastic CRD users to TorchX by kiukchung · Pull Request #167 · pytorch/elastic · GitHub

1 Like