Hello,
I am trying to do the FSDP + TP parallelism with a simple Autoencoder. When I do it on a single node with multiple GPUs it works fine (device_mesh = init_device_mesh(“cuda”, (2, 2), mesh_dim_names=(“dp”, “tp”))), but when i scale it to multiple nodes with multiple GPUs I get the following error. I have 4 GPUs for each node.
[rank0]: Traceback (most recent call last):
[rank0]: File “/gpfs/scratch/bsc23/MN4/bsc23/bsc23645/AE_tp/tp_test.py”, line 111, in
[rank0]: output = sharded_model(inp)
[rank0]: ^^^^^^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/nn/modules/module.py”, line 1532, in _wrapped_call_impl
[rank0]: return self._call_impl(*args, **kwargs)
[rank0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/nn/modules/module.py”, line 1541, in _call_impl
[rank0]: return forward_call(*args, **kwargs)
[rank0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/fully_sharded_data_parallel.py”, line 823, in forward
[rank0]: args, kwargs = _pre_forward(
[rank0]: ^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/_runtime_utils.py”, line 380, in _pre_forward
[rank0]: unshard_fn(state, handle)
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/_runtime_utils.py”, line 415, in _pre_forward_unshard
[rank0]: _unshard(state, handle, state._unshard_stream, state._pre_unshard_stream)
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/_runtime_utils.py”, line 299, in _unshard
[rank0]: handle.unshard()
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/_flat_param.py”, line 1308, in unshard
[rank0]: padded_unsharded_flat_param = self._all_gather_flat_param(unsharded_flat_param)
[rank0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/fsdp/_flat_param.py”, line 1399, in _all_gather_flat_param
[rank0]: dist.all_gather_into_tensor(
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/c10d_logger.py”, line 78, in wrapper
[rank0]: return func(*args, **kwargs)
[rank0]: ^^^^^^^^^^^^^^^^^^^^^
[rank0]: File “/gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/distributed/distributed_c10d.py”, line 2971, in all_gather_into_tensor
[rank0]: work = group._allgather_base(output_tensor, input_tensor, opts)
[rank0]: ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[rank0]: torch.distributed.DistBackendError: NCCL error in: /gpfs/apps/MN5/ACC/PYTORCH/2.4.0/torch/csrc/distributed/c10d/NCCLUtils.hpp:275, invalid usage (run with NCCL_DEBUG=WARN for details), NCCL version 2.20.5
[rank0]: ncclInvalidUsage: This usually reflects invalid usage of NCCL library.
[rank0]: Last error:
[rank0]: Duplicate GPU detected : rank 0 and rank 1 both on CUDA device 1b000
logger = get_logger()
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
# Device Mesh
device_mesh = init_device_mesh("cuda", (4, 4), mesh_dim_names=("dp", "tp"))
tp_mesh = device_mesh["tp"]
dp_mesh = device_mesh["dp"]
dp_rank = dp_mesh.get_local_rank()
# Prepare Dataset
dataset = np.load("../ML/Simulations/NPY/WTx2_apo_CA_ChainsA_DRY_imagedFit.npy")
dataset = np.reshape(dataset, (len(dataset), -1))
num_frames, n_features = dataset.shape
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True, seed=42, drop_last=False)
dataloader = DataLoader(dataset, batch_size=1, sampler=sampler, drop_last=True)
model = AutoEncoder2(input_dim=n_features, nlayers=6, latent_dim=3)
tp_model = parallelize_module(
model,
tp_mesh,
parallelize_plan=parallel_plan_ae_nl6)
# FSDP
sharded_model = FSDP(tp_model, device_mesh=dp_mesh, use_orig_params=True).to("cuda")
# Create an optimizer for the parallelized and sharded model.
lr = 1e-4
rank_log(rank, logger, f"Creating Adam optimizer with learning rate {lr}")
optimizer = torch.optim.AdamW(sharded_model.parameters(), lr=lr, foreach=True)
# Training loop:
num_epochs = 10
model.train()
for epoch in range(num_epochs):
for i, inp in enumerate(dataloader):
torch.manual_seed(i + dp_rank)
inp = inp.to(torch.cuda.current_device())
output = sharded_model(inp)
loss = torch.nn.functional.mse_loss(output, inp)
train_losses.append(loss.item())
optimizer.zero_grad()
loss.sum().backward()
optimizer.step()
train_loss = torch.mean(torch.tensor(train_losses))
Here is my SLURM job:
#!/bin/bash
#SBATCH --job-name=multinode-fsdp
#SBATCH -o \%x.out
#SBATCH -e \%x.err
#SBATCH --nodes=4
#SBATCH --ntasks=80
#SBATCH --qos=acc_debug
#SBATCH -t 0-02:00:00
#SBATCH --gres=gpu:4
nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
echo Node IP: $head_node_ip
export LOGLEVEL=INFO
#srun torchrun
srun torchrun \
--nnodes 4 \
--nproc_per_node 4 \
--rdzv_id $RANDOM \
--rdzv_backend c10d \
--rdzv_endpoint $head_node_ip:29501 \
tp_test.py &> tp-fsdp_output.log
What am I doing wrong?
I’m new in distributed training, can somebody help?