Using DDP Gloo Recv "Aborts" Process

Hi I am trying to use DDP for inference as during inference I have both CPU intense and GPU intense tasks and would like to offload as much of the CPU tasks on different nodes whereas the GPU task can be completed by one node.

I figured out how to use PyTorch’s DDP module to connect between different server nodes however I am having trouble using the recv function to send tensors between nodes. Every time the node is about to receive a tensor the message Aborted prints on the console and the process stops. I tried creating a toy example, but it doesn’t happen their. I have the tensor on CPU and the shape is predetermined. Is there a reason why it aborts the process? Is there a way to debug gloo ddp like there is for nccl ddp?

Also is there a way to send and receive tensors without having to specify the shape ahead of time?

I am using PyTorch 1.11 with Cuda 11.3 on A40s

Connection code:

def get_ifname():
    return ifcfg.default_interface()["device"]


def init_distrib_slurm(world_rank, world_size, backend="nccl"):
    if "GLOO_SOCKET_IFNAME" not in os.environ:
        os.environ["GLOO_SOCKET_IFNAME"] = get_ifname()

    if "NCCL_SOCKET_IFNAME" not in os.environ:
        os.environ["NCCL_SOCKET_IFNAME"] = get_ifname()

    master_port = int(os.environ.get("MASTER_PORT", 8738))
    master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1")
    # local_rank = int(os.environ.get("LOCAL_RANK", os.environ.get("SLURM_LOCALID", 0)))
    world_rank = int(os.environ.get("RANK", os.environ.get("SLURM_PROCID", 0))) if not world_rank else world_rank
    world_size = int(os.environ.get("WORLD_SIZE", os.environ.get("SLURM_NTASKS", 1))) if not world_size else world_size
    print(master_addr, master_port, world_rank, world_size)
    sys.stdout.flush()

    tcp_store = distrib.TCPStore(master_addr, master_port, world_size, world_rank == 0)
    distrib.init_process_group(
        backend, store=tcp_store, rank=world_rank, world_size=world_size
    )

    return tcp_store

Send and receive code:

if self.rank != 0:
    tokenized = self.tokenizer(input1)  # tokenization process is a bit more complicated than this
    torch.distributed.send(tokenized["input_ids"], 0)
if self.rank == 0:
    tokenized = torch.zeros([320, 9])
    sender_rank = torch.distributed.recv(tokenized)

Can you please share the detailed error message? It would be nice if you have a stack trace as well. Thanks.

All I receive is the message Aborted on the rank 0 node and that is it. I don’t receive a detailed error message. Is there an equivalent to the NCCL_DEBUG for gloo that I can use?

Cc @osalpekar to have a look

Are any of the tensors you are trying to communicate CUDA tensors? Gloo send/recv does not support CUDA tensors at the moment (see the call to the underlying Gloo library here which tries to perform a CPU-based send over TCP: pytorch/ProcessGroupGloo.cpp at master · pytorch/pytorch · GitHub). If this is the case, you may need to copy the tensors to CPU before performing send/recv on them.

Gloo collective operations support CUDA tensors by copying tensors to CPU before and after performing the actual communication using the CPU. We may be able to implement something similar for p2p operations like send/recv in the future.

All tensors so far are CPU tensors. I am currently trying to send the tensors from the CPU machine to the GPU machine for processing.

I would imagine that the tensors being sent/received on both sides must be CPU tensors. I don’t think there is a communication library today that supports a direct send from CPU tensor to remote GPU tensor.

Can you check if the tensors you created on both machines are CPU tensors?
tensor.is_cuda should tell us that.