I want to benchmark how quickly PyTorch with the Gloo backend is able to all-reduce all-gather a model synchronously. To do so, I’ve written the following script  working with the latest Gloo backend / PyTorch. I start it on N machines, and then they together all-reduce it fine. However, the bandwidth that I see, irrespective of N, is 0.5 * link_bandwidth. The N machines are all connected to a 100 Mbps per-port switch. This is expected with a large N, as the documentation does state that it uses a ring all-reduce/all-gather to perform the distributed.all_reduce behind that scenes.
However, for i.e. N=2 I would expect it to perform the all-reduce at 1.0 * link_bandwidth, as one node only needs to send to one other node its full model. My experiments show however 0.5 * link_bandwidth . I would expect the bandwidth of the all-reduce for any N to be :
Amount of data to send in all-reduce: (N - 1) / N
Amount of data to send in all-gather: (N - 1) / N
Total data for each node to send: 2 * (N - 1) / N
I.e., at N=2 we only need to send 2 * 1/2 = 1x model, and at N->inf we have to send 2x model. Translating into a all-reduce/all-gather bandwidth of 1.0 * link_bandwidth (N=2) and 0.5 * link_bandwidth (N->inf).
I am not sure where my calculation is wrong, or where I am misunderstanding the all-reduce ring method.
 allreduce.py – execute for N=2 on each of the two machines:
python allreduce.py 0 2 192.168.0.1 10000 and
python allreduce.py 1 2 192.168.0.1 10000
#!/usr/bin/env python import os import sys import torch import torch.distributed as dist from torch.multiprocessing import Process import time # Values are 4 bytes each, so 2 * 1000 * 1000 * 32 * 4 = 256 MB = 2048 Mbit MODEL_SIZE_VALUES = 2 * 1000 * 1000 * 32 BIT_PER_VALUE = 4 * 8 BITS_PER_MBIT = 1000 * 1000 def current_time_in_ms(): return int(round(time.time() * 1000)) def run(rank, size): group = dist.new_group(list(range(size))) tensor = torch.ones(MODEL_SIZE_VALUES, dtype=torch.float32) print("Performing allreduce...") print(" > Data to send: %d Mbit" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE) / float(BITS_PER_MBIT))) start = current_time_in_ms() dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group) elapsed_ms = current_time_in_ms() - start print(" > Finished.") print(" > Time: %.2f s" % (elapsed_ms / 1000.0)) print(" > Speed: %.2f Mbit/s" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE / BITS_PER_MBIT) / float(elapsed_ms / 1000.0))) print(' > Result: Rank ', rank, ' has data ', str(tensor), '.\n') def init_process(my_rank, size, master_address, master_port, fn, backend='gloo'): # Initialize the distributed environment os.environ['MASTER_ADDR'] = master_address os.environ['MASTER_PORT'] = master_port # Initialize process group print("Initializing process group...") dist.init_process_group(backend, rank=my_rank, world_size=size) print(" > Initialized.") print("") fn(my_rank, size) def main(my_rank, size, master_address, master_port): p = Process(target=init_process, args=(my_rank, size, master_address, master_port, run)) p.start() p.join() if __name__ == "__main__": args = sys.argv[1:] if len(args) != 4: print("Usage: python allreduce.py <my rank> <size> <master address> <master port>") exit(1) else: main(int(args), int(args), str(args), str(args))
 Output of machine 1 and 2 with N=2:
Initializing process group... > Initialized. Performing allreduce... > Data to send: 2048 Mbit > Finished. > Time: 44.79 s > Speed: 45.72 Mbit/s > Result: Rank 0 has data tensor([2., 2., 2., ..., 2., 2., 2.]) .
Initializing process group... > Initialized. Performing allreduce... > Data to send: 2048 Mbit > Finished. > Time: 44.79 s > Speed: 45.72 Mbit/s > Result: Rank 1 has data tensor([2., 2., 2., ..., 2., 2., 2.]) .