Hi I am trying to use DDP for inference as during inference I have both CPU intense and GPU intense tasks and would like to offload as much of the CPU tasks on different nodes whereas the GPU task can be completed by one node.
I figured out how to use PyTorch’s DDP module to connect between different server nodes however I am having trouble using the recv
function to send tensors between nodes. Every time the node is about to receive a tensor the message Aborted
prints on the console and the process stops. I tried creating a toy example, but it doesn’t happen their. I have the tensor on CPU and the shape is predetermined. Is there a reason why it aborts the process? Is there a way to debug gloo ddp like there is for nccl ddp?
Also is there a way to send and receive tensors without having to specify the shape ahead of time?
I am using PyTorch 1.11 with Cuda 11.3 on A40s
Connection code:
def get_ifname():
return ifcfg.default_interface()["device"]
def init_distrib_slurm(world_rank, world_size, backend="nccl"):
if "GLOO_SOCKET_IFNAME" not in os.environ:
os.environ["GLOO_SOCKET_IFNAME"] = get_ifname()
if "NCCL_SOCKET_IFNAME" not in os.environ:
os.environ["NCCL_SOCKET_IFNAME"] = get_ifname()
master_port = int(os.environ.get("MASTER_PORT", 8738))
master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1")
# local_rank = int(os.environ.get("LOCAL_RANK", os.environ.get("SLURM_LOCALID", 0)))
world_rank = int(os.environ.get("RANK", os.environ.get("SLURM_PROCID", 0))) if not world_rank else world_rank
world_size = int(os.environ.get("WORLD_SIZE", os.environ.get("SLURM_NTASKS", 1))) if not world_size else world_size
print(master_addr, master_port, world_rank, world_size)
sys.stdout.flush()
tcp_store = distrib.TCPStore(master_addr, master_port, world_size, world_rank == 0)
distrib.init_process_group(
backend, store=tcp_store, rank=world_rank, world_size=world_size
)
return tcp_store
Send and receive code:
if self.rank != 0:
tokenized = self.tokenizer(input1) # tokenization process is a bit more complicated than this
torch.distributed.send(tokenized["input_ids"], 0)
if self.rank == 0:
tokenized = torch.zeros([320, 9])
sender_rank = torch.distributed.recv(tokenized)