How to parallel communication and computation in distributed training?

Hello everyone!
I used four nodes to test the parallelism of communication and computation.
However, the results were very confusing.
See the code below please.

#!/usr/bin/env python
import argparse
import os
import threading

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import math
from random import Random
from torch.multiprocessing import Process
from torch.autograd import Variable
from torchvision import datasets, transforms
from torchvision import models
from models import resnet
import time
import torch.multiprocessing as mp

def send_tensor(tensor):
    dist.send(tensor=tensor, dst=1)

def recv_tensor(device):
    tensor_to_recv = torch.rand((1000,1000,1000)).to(device)
    dist.recv(tensor=tensor_to_recv, src=0)

def init_subprocess(rank, world_size):
    dist.init_process_group('nccl', init_method='IP:port',rank=rank, world_size=world_size)
    print("subprocess init end...")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--rank', type=int)
    parser.add_argument('--world_size', type=int)
    args = parser.parse_args()
    os.environ["MASTER_ADDR"] = "IP"
    os.environ["MASTER_PORT"] = "port"
    dist.init_process_group('nccl', rank=args.rank, world_size=args.world_size)
    mp.set_start_method("spawn")
    print("init end...")
    device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu")
    tot = list(range(1000))
    # p = mp.Pool(1)
    # p.apply(func=init_subprocess, args=(args.rank, args.world_size,))
    # if args.rank == 0:
    #     #communication
    #     tensor_to_send = torch.zeros((1000, 1000, 1000)).to(device)
    #     p.apply_async(func=send_tensor, args=(tensor_to_send, ))
    #     print('send end...')
    #     #compute
    #     tot = tot * (args.rank)
    # else:
    #     #communication
    #     p.apply_async(func=recv_tensor)
    #     print('recv end...')
    #     #compute
    #     tot = tot * (args.rank)

    if args.rank == 0:
        # communication
        tensor_to_send = torch.zeros((1000, 1000, 1000)).to(device)
        p = threading.Thread(target=send_tensor, args=(tensor_to_send,))
        p.start()
        print('send end...')
        # compute
        tot = tot*(args.rank)
    else:
        #communication
        p = threading.Thread(target=recv_tensor, args=(device,))
        p.start()
        print('recv end...')
        # compute
        tot = tot*(args.rank)
    print(tot)

I used multiprocess and multithread respectively to achieve parallelism. And I made sure that the computation didn’t involve the results of the communication. However, the computation does not start immediately after the communication is invoked, as I expected, but rather after the communication is finished. That’s not the parallelism I had in mind.

I’m very confused. Could you give me some advice?

Thank you very much for your help. Looking forward to your reply!

@Yanli_Zhao @ptrblck @mrshenli @aazzolini Can you give me some advice? I would appreciate it very much.

I’m not sure I understand your use case, but wouldn’t you run into the Python GIL?

Thank you for your reply!
The multithreaded implementation should have GIL problems, but the multiprocess I commented out should not have GIL problems.
However, in a multi-process environment the computation still starts after the communication ends.
Can’t multiprocess and GPU communication be used simultaneously?

Generally it would depend on the occupancy of the kernels. In particular you would have to check if the compute kernel would occupy all compute resources and thus cannot be launched as NCCL needs to spin SMs for its communication.

Thank you for your advice. I’ll give it a try.