Hello everyone!
I used four nodes to test the parallelism of communication and computation.
However, the results were very confusing.
See the code below please.
#!/usr/bin/env python
import argparse
import os
import threading
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import math
from random import Random
from torch.multiprocessing import Process
from torch.autograd import Variable
from torchvision import datasets, transforms
from torchvision import models
from models import resnet
import time
import torch.multiprocessing as mp
def send_tensor(tensor):
dist.send(tensor=tensor, dst=1)
def recv_tensor(device):
tensor_to_recv = torch.rand((1000,1000,1000)).to(device)
dist.recv(tensor=tensor_to_recv, src=0)
def init_subprocess(rank, world_size):
dist.init_process_group('nccl', init_method='IP:port',rank=rank, world_size=world_size)
print("subprocess init end...")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--rank', type=int)
parser.add_argument('--world_size', type=int)
args = parser.parse_args()
os.environ["MASTER_ADDR"] = "IP"
os.environ["MASTER_PORT"] = "port"
dist.init_process_group('nccl', rank=args.rank, world_size=args.world_size)
mp.set_start_method("spawn")
print("init end...")
device = torch.device(f"cuda:{0}" if torch.cuda.is_available() else "cpu")
tot = list(range(1000))
# p = mp.Pool(1)
# p.apply(func=init_subprocess, args=(args.rank, args.world_size,))
# if args.rank == 0:
# #communication
# tensor_to_send = torch.zeros((1000, 1000, 1000)).to(device)
# p.apply_async(func=send_tensor, args=(tensor_to_send, ))
# print('send end...')
# #compute
# tot = tot * (args.rank)
# else:
# #communication
# p.apply_async(func=recv_tensor)
# print('recv end...')
# #compute
# tot = tot * (args.rank)
if args.rank == 0:
# communication
tensor_to_send = torch.zeros((1000, 1000, 1000)).to(device)
p = threading.Thread(target=send_tensor, args=(tensor_to_send,))
p.start()
print('send end...')
# compute
tot = tot*(args.rank)
else:
#communication
p = threading.Thread(target=recv_tensor, args=(device,))
p.start()
print('recv end...')
# compute
tot = tot*(args.rank)
print(tot)
I used multiprocess and multithread respectively to achieve parallelism. And I made sure that the computation didn’t involve the results of the communication. However, the computation does not start immediately after the communication is invoked, as I expected, but rather after the communication is finished. That’s not the parallelism I had in mind.
I’m very confused. Could you give me some advice?
Thank you very much for your help. Looking forward to your reply!