NCCL/Gloo: freeze with cyclic isend and recv

Hello!

I’ve made a very simple algorithm in two versions: on Gloo backed and on NCCL backend. And the both versions freeze. Could you please explain me why these programs hang?

I assume the cause is that I send data in circle:

  1. process #1 sends data to process #2
  2. process #2 sends data to process #3
  3. process #3 sends data to process #1

But I don’t understand why it can lead to a freeze.

import os
import torch as th
import torch.distributed as dist
import torch.multiprocessing as mp
 
 
def run(rank: int, value: float, src:int, dst: int):
    tensor = th.FloatTensor([value,]).to(f"cuda:{rank}")
    print(f"[rk={rank}] tensor before send-recv: {tensor}")
    req = dist.isend(tensor=tensor, dst=dst)
    print(f"[rk={rank}] after isend")
    dist.recv(tensor=tensor, src=src)
    print(f"[rk={rank}] after recv")
    req.wait()
    print(f"[rk={rank}] after wait")
    print(f"[rk={rank}] tensor after send-recv: {tensor}")
   
def init_process(rank: int):
    dist.init_process_group(
        "nccl",
        rank=rank,
        world_size=3,
        init_method="file:///home/user/store"
    )
    if rank==0:
        run(rank=rank, value=float(rank), src=1, dst=1)
    elif rank==1:
        run(rank, value=float(rank), src=2, dst=2)
    elif rank==2:
        run(rank, value=float(rank), src=0, dst=0)
    else:
        raise Exception()
 
 
if __name__ == "__main__":
    mp.set_start_method("spawn")
    processes = []
    for rank in [0,1, 2]:
        p = mp.Process(target=init_process, args=(rank, ))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

And here’s a Gloo version (and without CUDA):

import os
import torch as th
import torch.distributed as dist
import torch.multiprocessing as mp
 
 
def run(rank: int, value: float, src:int, dst: int):
    tensor = th.FloatTensor([value,])
    print(f"[rk={rank}] tensor before send-recv: {tensor}")
    req = dist.isend(tensor=tensor, dst=dst)
    print(f"[rk={rank}] after isend")
    dist.recv(tensor=tensor, src=src)
    print(f"[rk={rank}] after recv")
    req.wait()
    print(f"[rk={rank}] after wait")
    print(f"[rk={rank}] tensor after send-recv: {tensor}")
   
def init_process(rank: int):
    dist.init_process_group(
        "gloo",
        rank=rank,
        world_size=3,
        init_method="file:///home/user/store"
    )
    if rank==0:
        run(rank=rank, value=float(rank), src=1, dst=1)
    elif rank==1:
        run(rank, value=float(rank), src=2, dst=2)
    elif rank==2:
        run(rank, value=float(rank), src=0, dst=0)
    else:
        raise Exception()
 
 
if __name__ == "__main__":
    mp.set_start_method("spawn")
    processes = []
    for rank in [0,1, 2]:
        p = mp.Process(target=init_process, args=(rank, ))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

Thank you!

Hi, looking at your code seems like your src and dst passed to the run function are wrong.