How to broadcast tensors using NCCL?

I’m unclear on how to broadcast tensors using NCCL from the rank0 process to all other processes. I’ve tried a few approaches, but each attempt freezes the process and puts the GPUs @ 100% utilization (checked via nvidia-smi). Presumably I’m not using broadcast correctly :frowning:

For example, I added a broadcasting snippet to this sample script from huggingface:

Please note, the script works perfectly before adding the broadcasting code. That is, the all-reduce results are all correct.

import fcntl
import os
import socket
import torch
import torch.distributed as dist


def printflock(*msgs):
    """ solves multi-process interleaved print problem """
    with open(__file__, "r") as fh:
        fcntl.flock(fh, fcntl.LOCK_EX)
        try:
            print(*msgs)
        finally:
            fcntl.flock(fh, fcntl.LOCK_UN)


local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
hostname = socket.gethostname()

gpu = f"[{hostname}-{local_rank}]"

try:
    # init
    # dist.init_process_group("nccl")

    os.environ["OMP_NUM_THREADS"] = "1"
    # os.environ["PYTHONHASHSEED"] = str(args.seed)
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'

    # torch.cuda.set_device(args.local_rank)
    dist.init_process_group(
        backend=dist.Backend.NCCL,
        init_method='env://',
        world_size=2,
        rank=local_rank
    )

    assert dist.is_initialized()
    assert dist.is_nccl_available()

    # global rank
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    # test all-reduce
    x = torch.ones(1).to(device)
    if local_rank == 0:
        x.add_(1)

    dist.all_reduce(x, op=dist.ReduceOp.SUM)
    dist.barrier()
    printflock(f"{gpu} x: {x}")

    # test all_reduce
    x = torch.tensor(1.0, device='cuda')
    # count = torch.zeros(1).to(device)
    if local_rank == 0:
        x.add_(3)
        # count.add_(1)
        # torch.cuda.comm.broadcast(count, devices=[1])
    #     dist.broadcast(count, src=local_rank)
    dist.all_reduce(x, op=dist.ReduceOp.AVG)
    dist.barrier()
    printflock(f"{gpu} x: {x}")

    ## test broadcast - code freezes in this block!
    # count = torch.zeros(1).to(device)
    # if local_rank == 0:
    #     count.add_(1)
    #     dist.broadcast(tensor=count, src=0)
    # dist.barrier()
    # printflock(f"{gpu} count: {count}")

    # test cuda is available and can allocate memory
    torch.cuda.is_available()
    torch.ones(1).cuda(local_rank)

    printflock(f"{gpu} is OK (global rank: {rank + 1}/{world_size})")

    dist.barrier()
    if rank == 0:
        printflock(f"pt={torch.__version__}, cuda={torch.version.cuda}, nccl={torch.cuda.nccl.version()}")
        printflock(f"device compute capabilities={torch.cuda.get_device_capability()}")
        printflock(f"pytorch compute capabilities={torch.cuda.get_arch_list()}")

except Exception:
    printflock(f"{gpu} is broken")
    raise

I ran it as follows:

CUDA_VISIBLE_DEVICES=0,1 python -m torch.distributed.launch --nproc_per_node=2 --nnodes=1 --node_rank=0 torch-distributed-gpu-test.py

The code freezes in the broadcast portion.

Thanks!

cc: @ptrblck :slight_smile:

1 Like

I’m having similar issues. broadcast either hangs the program or not broadcasting at all, the other processes still have the old tensor.

@vgoklani @Li_Shen

I answered this in another post *deadlock* when using torch.distributed.broadcast

Basically when you are adding collectives like broadcast, please make sure it’s called on all ranks rather than only rank 0 (like all_reduce in the script), this should resolve this issue. Let me know if it not works.

it works, thank you!!!

1 Like