Errors when deploying PyTorch Lightning Model to AWS SageMaker TrainingJobs: SMDDP does not support ReduceOp

Hi community! I am trying to follow the DDP (Distributed Data Parallel) guidance (Guide 1, Guide 2) and deploy my deep learning models to AWS SageMaker.

However, when running it, I am encountering the following error. [1 instance, 4 GPUs]

May I ask how I can fix this error? Any suggestions or comments would be greatly appreciated! Thank you again!

Model Code:

from torch.distributed import init_process_group, destroy_process_group
from torchmetrics.functional import pairwise_cosine_similarity
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.loggers import CSVLogger

import smdistributed.dataparallel.torch.torch_smddp
from lightning.pytorch.strategies import DDPStrategy
from lightning.fabric.plugins.environments.lightning import LightningEnvironment
import lightning as pl

env = LightningEnvironment()
env.world_size = lambda: int(os.environ["WORLD_SIZE"])
env.global_rank = lambda: int(os.environ["RANK"])

def main(args):
    
    train_samples = 1000 
    val_samples = 200 
    test_samples = 200 
    
    csv_logger = CSVLogger(save_dir=args.model_dir, name=args.modelname)
    
    # Initialize the DataModule
    data_module = ImagePairDataModule(
        data_save_folder=args.data_dir,
        train_samples=train_samples,
        val_samples=val_samples,
        test_samples=test_samples,
        batch_size=args.batch_size,
        num_workers=12,
    )

    # Initialize the model
    model = Siamese()

    # Configure checkpoint callback to save the best model
    checkpoint_callback = ModelCheckpoint(
        monitor="val_loss",                 # Monitor validation loss
        dirpath=args.model_dir,             # Directory to save model checkpoints
        filename="best-checkpoint-test",    # File name of the checkpoint
        save_top_k=1,                       # Only save the best model
        mode="min",                         # Save when validation loss is minimized
        save_on_train_epoch_end=False,
    )

    # Configure early stopping to stop training if validation loss doesn't improve
    early_stopping_callback = EarlyStopping(
        monitor="val_loss",                 # Monitor validation loss
        patience=args.patience,             # Number of epochs with no improvement before stopping
        mode="min",                         # Stop when validation loss is minimized
    )
    
    # Set up ddp on SageMaker
    # https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-modify-sdp-pt-lightning.html
    ddp = DDPStrategy(
        cluster_environment=env, 
        process_group_backend="smddp", 
        accelerator="gpu"
    )

    # Initialize the PyTorch Lightning Trainer
    trainer = pl.Trainer(
        max_epochs=args.epochs,
        strategy=ddp,                      # Distributed Data Parallel strategy
        devices=torch.cuda.device_count(),    # Use all available GPUs
        precision=16,                        # Use mixed precision (16-bit)
        callbacks=[checkpoint_callback, early_stopping_callback],
        log_every_n_steps=10,
        logger=csv_logger,
    )

    # Train the model
    trainer.fit(model, datamodule=data_module)
    
    best_model_path = checkpoint_callback.best_model_path
    print(f"Saving best model to: {best_model_path}")
    
    # Destroy the process group if distributed training was initialized
    if torch.distributed.is_initialized():
        torch.distributed.destroy_process_group()

    
    
if __name__ == "__main__":
    
    # Set up argument parser for command-line arguments
    parser = argparse.ArgumentParser()
    
    # Adding arguments
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--batch_size', type=int, default=256)
    parser.add_argument('--patience', type=int, default=10)
    parser.add_argument('--modelname', type=str, default='testing_model')
    
    # Container environment
    parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"]))
    parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
    parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"])

    # Parse arguments
    args = parser.parse_args()

    # Ensure the model directory exists
    os.makedirs(args.model_dir, exist_ok=True)

    # Launch the main function
    main(args)

Training Job Code:

hyperparameters_set = {
    'epochs': 100,                  # Total number of epochs
    'batch_size': 200,              # Input batch size on each device
    'patience': 10,                 # Early stopping patience
    'modelname': model_task_name,   # Name for the model
}

estimator = PyTorch(
    entry_point = "model_01.py",
    source_dir = "./sage_code_300",
    output_path = jobs_folder + "/",
    code_location = jobs_folder,
    role = role,
    input_mode = 'FastFile',
    py_version="py310",
    framework_version="2.2.0",
    instance_count=1,
    instance_type="ml.g5.12xlarge",
    hyperparameters=hyperparameters_set,
    volume_size=800,  
    distribution={'pytorchddp': {'enabled': True}},
    dependencies=["./sage_code/requirements.txt"],
)

estimator.fit({"training": inputs},
               job_name = job_name,
               wait = False,  
               logs=True) 

Error messages and logs

2024-11-09 00:25:02 Uploading - Uploading generated training model
2024-11-09 00:25:02 Failed - Training job failed
---------------------------------------------------------------------------
UnexpectedStatusException                 Traceback (most recent call last)
Cell In[26], line 34
     14 estimator = PyTorch(
     15     entry_point = "model_01.py",
     16     source_dir = "./sage_code_300",
   (...)
     29     dependencies=["./sage_code_300/requirements.txt"],
     30 )
     32 ######### Run the model #############
     33 # Send the model to sage training jobs
---> 34 estimator.fit({"training": inputs},
     35                job_name = job_name,
     36                wait = True,  # True 
     37                logs=True) 
     40 model_will_save_path = os.path.join(jobs_folder, job_name, "output", "model.tar.gz")
     41 print(f"\nModel is saved at:\n\n{model_will_save_path}")

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/workflow/pipeline_context.py:346, in runnable_by_pipeline.<locals>.wrapper(*args, **kwargs)
    342         return context
    344     return _StepArguments(retrieve_caller_name(self_instance), run_func, *args, **kwargs)
--> 346 return run_func(*args, **kwargs)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:1376, in EstimatorBase.fit(self, inputs, wait, logs, job_name, experiment_config)
   1374     forward_to_mlflow_tracking_server = True
   1375 if wait:
-> 1376     self.latest_training_job.wait(logs=logs)
   1377 if forward_to_mlflow_tracking_server:
   1378     log_sagemaker_job_to_mlflow(self.latest_training_job.name)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:2750, in _TrainingJob.wait(self, logs)
   2748 # If logs are requested, call logs_for_jobs.
   2749 if logs != "None":
-> 2750     self.sagemaker_session.logs_for_job(self.job_name, wait=True, log_type=logs)
   2751 else:
   2752     self.sagemaker_session.wait_for_job(self.job_name)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:5945, in Session.logs_for_job(self, job_name, wait, poll, log_type, timeout)
   5924 def logs_for_job(self, job_name, wait=False, poll=10, log_type="All", timeout=None):
   5925     """Display logs for a given training job, optionally tailing them until job is complete.
   5926 
   5927     If the output is a tty or a Jupyter cell, it will be color-coded
   (...)
   5943         exceptions.UnexpectedStatusException: If waiting and the training job fails.
   5944     """
-> 5945     _logs_for_job(self, job_name, wait, poll, log_type, timeout)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8547, in _logs_for_job(sagemaker_session, job_name, wait, poll, log_type, timeout)
   8544             last_profiler_rule_statuses = profiler_rule_statuses
   8546 if wait:
-> 8547     _check_job_status(job_name, description, "TrainingJobStatus")
   8548     if dot:
   8549         print()

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8611, in _check_job_status(job, desc, status_key_name)
   8605 if "CapacityError" in str(reason):
   8606     raise exceptions.CapacityError(
   8607         message=message,
   8608         allowed_statuses=["Completed", "Stopped"],
   8609         actual_status=status,
   8610     )
-> 8611 raise exceptions.UnexpectedStatusException(
   8612     message=message,
   8613     allowed_statuses=["Completed", "Stopped"],
   8614     actual_status=status,
   8615 )

UnexpectedStatusException: Error for Training job model-res50-300k-noft-contra-aug-2024-11-09-00-18-33: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
ExitCode 1
ErrorMessage "RuntimeError: SMDDP does not support: ReduceOp
 Traceback (most recent call last)
 File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
 return _run_code(code, main_globals, None,
 File "/opt/conda/lib/python3.10/runpy.py", line 86, in _run_code
 exec(code, run_globals)
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/__main__.py", line 7, in <module>
 main()
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 230, in main
 run_command_line(args)
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 47, in run_command_line
 run_path(sys.argv[0], run_name='__main__')
 File "/opt/conda/lib/python3.10/runpy.py", line 289, in run_path
 return _run_module_code(code, init_globals, run_name,
 File "/opt/conda/lib/python3.10/runpy.py", line 96, in _run_module_code
 _run_code(code, mod_globals, init_globals,
 File "s4_mod_pl_cloud_01.py", line 369, in <module>
 main(args)
 File "s4_mod_pl_cloud_01. Check troubleshooting guide for common errors: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-python-sdk-troubleshooting.html

Environment

# PyTorch Lightning Version (e.g., 2.4.0): 2.4.0
# PyTorch Version (e.g., 2.4): 2.2
# Python version (e.g., 3.12):  3.10
# OS (e.g., Linux): Linux2
# CUDA/cuDNN version: 12.4
# GPU models and configuration: A10G