Hi community! I am trying to follow the DDP (Distributed Data Parallel) guidance (Guide 1, Guide 2) and deploy my deep learning models to AWS SageMaker.
However, when running it, I am encountering the following error. [1 instance, 4 GPUs]
May I ask how I can fix this error? Any suggestions or comments would be greatly appreciated! Thank you again!
Model Code:
from torch.distributed import init_process_group, destroy_process_group
from torchmetrics.functional import pairwise_cosine_similarity
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.loggers import CSVLogger
import smdistributed.dataparallel.torch.torch_smddp
from lightning.pytorch.strategies import DDPStrategy
from lightning.fabric.plugins.environments.lightning import LightningEnvironment
import lightning as pl
env = LightningEnvironment()
env.world_size = lambda: int(os.environ["WORLD_SIZE"])
env.global_rank = lambda: int(os.environ["RANK"])
def main(args):
train_samples = 1000
val_samples = 200
test_samples = 200
csv_logger = CSVLogger(save_dir=args.model_dir, name=args.modelname)
# Initialize the DataModule
data_module = ImagePairDataModule(
data_save_folder=args.data_dir,
train_samples=train_samples,
val_samples=val_samples,
test_samples=test_samples,
batch_size=args.batch_size,
num_workers=12,
)
# Initialize the model
model = Siamese()
# Configure checkpoint callback to save the best model
checkpoint_callback = ModelCheckpoint(
monitor="val_loss", # Monitor validation loss
dirpath=args.model_dir, # Directory to save model checkpoints
filename="best-checkpoint-test", # File name of the checkpoint
save_top_k=1, # Only save the best model
mode="min", # Save when validation loss is minimized
save_on_train_epoch_end=False,
)
# Configure early stopping to stop training if validation loss doesn't improve
early_stopping_callback = EarlyStopping(
monitor="val_loss", # Monitor validation loss
patience=args.patience, # Number of epochs with no improvement before stopping
mode="min", # Stop when validation loss is minimized
)
# Set up ddp on SageMaker
# https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-modify-sdp-pt-lightning.html
ddp = DDPStrategy(
cluster_environment=env,
process_group_backend="smddp",
accelerator="gpu"
)
# Initialize the PyTorch Lightning Trainer
trainer = pl.Trainer(
max_epochs=args.epochs,
strategy=ddp, # Distributed Data Parallel strategy
devices=torch.cuda.device_count(), # Use all available GPUs
precision=16, # Use mixed precision (16-bit)
callbacks=[checkpoint_callback, early_stopping_callback],
log_every_n_steps=10,
logger=csv_logger,
)
# Train the model
trainer.fit(model, datamodule=data_module)
best_model_path = checkpoint_callback.best_model_path
print(f"Saving best model to: {best_model_path}")
# Destroy the process group if distributed training was initialized
if torch.distributed.is_initialized():
torch.distributed.destroy_process_group()
if __name__ == "__main__":
# Set up argument parser for command-line arguments
parser = argparse.ArgumentParser()
# Adding arguments
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--batch_size', type=int, default=256)
parser.add_argument('--patience', type=int, default=10)
parser.add_argument('--modelname', type=str, default='testing_model')
# Container environment
parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"]))
parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"])
# Parse arguments
args = parser.parse_args()
# Ensure the model directory exists
os.makedirs(args.model_dir, exist_ok=True)
# Launch the main function
main(args)
Training Job Code:
hyperparameters_set = {
'epochs': 100, # Total number of epochs
'batch_size': 200, # Input batch size on each device
'patience': 10, # Early stopping patience
'modelname': model_task_name, # Name for the model
}
estimator = PyTorch(
entry_point = "model_01.py",
source_dir = "./sage_code_300",
output_path = jobs_folder + "/",
code_location = jobs_folder,
role = role,
input_mode = 'FastFile',
py_version="py310",
framework_version="2.2.0",
instance_count=1,
instance_type="ml.g5.12xlarge",
hyperparameters=hyperparameters_set,
volume_size=800,
distribution={'pytorchddp': {'enabled': True}},
dependencies=["./sage_code/requirements.txt"],
)
estimator.fit({"training": inputs},
job_name = job_name,
wait = False,
logs=True)
Error messages and logs
2024-11-09 00:25:02 Uploading - Uploading generated training model
2024-11-09 00:25:02 Failed - Training job failed
---------------------------------------------------------------------------
UnexpectedStatusException Traceback (most recent call last)
Cell In[26], line 34
14 estimator = PyTorch(
15 entry_point = "model_01.py",
16 source_dir = "./sage_code_300",
(...)
29 dependencies=["./sage_code_300/requirements.txt"],
30 )
32 ######### Run the model #############
33 # Send the model to sage training jobs
---> 34 estimator.fit({"training": inputs},
35 job_name = job_name,
36 wait = True, # True
37 logs=True)
40 model_will_save_path = os.path.join(jobs_folder, job_name, "output", "model.tar.gz")
41 print(f"\nModel is saved at:\n\n{model_will_save_path}")
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/workflow/pipeline_context.py:346, in runnable_by_pipeline.<locals>.wrapper(*args, **kwargs)
342 return context
344 return _StepArguments(retrieve_caller_name(self_instance), run_func, *args, **kwargs)
--> 346 return run_func(*args, **kwargs)
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:1376, in EstimatorBase.fit(self, inputs, wait, logs, job_name, experiment_config)
1374 forward_to_mlflow_tracking_server = True
1375 if wait:
-> 1376 self.latest_training_job.wait(logs=logs)
1377 if forward_to_mlflow_tracking_server:
1378 log_sagemaker_job_to_mlflow(self.latest_training_job.name)
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:2750, in _TrainingJob.wait(self, logs)
2748 # If logs are requested, call logs_for_jobs.
2749 if logs != "None":
-> 2750 self.sagemaker_session.logs_for_job(self.job_name, wait=True, log_type=logs)
2751 else:
2752 self.sagemaker_session.wait_for_job(self.job_name)
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:5945, in Session.logs_for_job(self, job_name, wait, poll, log_type, timeout)
5924 def logs_for_job(self, job_name, wait=False, poll=10, log_type="All", timeout=None):
5925 """Display logs for a given training job, optionally tailing them until job is complete.
5926
5927 If the output is a tty or a Jupyter cell, it will be color-coded
(...)
5943 exceptions.UnexpectedStatusException: If waiting and the training job fails.
5944 """
-> 5945 _logs_for_job(self, job_name, wait, poll, log_type, timeout)
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8547, in _logs_for_job(sagemaker_session, job_name, wait, poll, log_type, timeout)
8544 last_profiler_rule_statuses = profiler_rule_statuses
8546 if wait:
-> 8547 _check_job_status(job_name, description, "TrainingJobStatus")
8548 if dot:
8549 print()
File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8611, in _check_job_status(job, desc, status_key_name)
8605 if "CapacityError" in str(reason):
8606 raise exceptions.CapacityError(
8607 message=message,
8608 allowed_statuses=["Completed", "Stopped"],
8609 actual_status=status,
8610 )
-> 8611 raise exceptions.UnexpectedStatusException(
8612 message=message,
8613 allowed_statuses=["Completed", "Stopped"],
8614 actual_status=status,
8615 )
UnexpectedStatusException: Error for Training job model-res50-300k-noft-contra-aug-2024-11-09-00-18-33: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
ExitCode 1
ErrorMessage "RuntimeError: SMDDP does not support: ReduceOp
Traceback (most recent call last)
File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/opt/conda/lib/python3.10/site-packages/mpi4py/__main__.py", line 7, in <module>
main()
File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 230, in main
run_command_line(args)
File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 47, in run_command_line
run_path(sys.argv[0], run_name='__main__')
File "/opt/conda/lib/python3.10/runpy.py", line 289, in run_path
return _run_module_code(code, init_globals, run_name,
File "/opt/conda/lib/python3.10/runpy.py", line 96, in _run_module_code
_run_code(code, mod_globals, init_globals,
File "s4_mod_pl_cloud_01.py", line 369, in <module>
main(args)
File "s4_mod_pl_cloud_01. Check troubleshooting guide for common errors: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-python-sdk-troubleshooting.html
Environment
# PyTorch Lightning Version (e.g., 2.4.0): 2.4.0
# PyTorch Version (e.g., 2.4): 2.2
# Python version (e.g., 3.12): 3.10
# OS (e.g., Linux): Linux2
# CUDA/cuDNN version: 12.4
# GPU models and configuration: A10G