I want to benchmark how quickly PyTorch with the Gloo backend is able to all-reduce all-gather a model synchronously. To do so, I’ve written the following script [2] working with the latest Gloo backend / PyTorch. I start it on N machines, and then they together all-reduce it fine. However, the bandwidth that I see, irrespective of N, is 0.5 * link_bandwidth. The N machines are all connected to a 100 Mbps per-port switch. This is expected with a large N, as the documentation does state that it uses a ring all-reduce/all-gather to perform the distributed.all_reduce behind that scenes.
However, for i.e. N=2 I would expect it to perform the all-reduce at 1.0 * link_bandwidth, as one node only needs to send to one other node its full model. My experiments show however 0.5 * link_bandwidth [3]. I would expect the bandwidth of the all-reduce for any N to be [1]:
Amount of data to send in all-reduce: (N - 1) / N
Amount of data to send in all-gather: (N - 1) / N
Total data for each node to send: 2 * (N - 1) / N
I.e., at N=2 we only need to send 2 * 1/2 = 1x model, and at N->inf we have to send 2x model. Translating into a all-reduce/all-gather bandwidth of 1.0 * link_bandwidth (N=2) and 0.5 * link_bandwidth (N->inf).
I am not sure where my calculation is wrong, or where I am misunderstanding the all-reduce ring method.
[1] https://github.com/zhangruiskyline/DeepLearning/blob/master/doc/system.md#allreduce-in-practice
[2] allreduce.py – execute for N=2 on each of the two machines: python allreduce.py 0 2 192.168.0.1 10000
and python allreduce.py 1 2 192.168.0.1 10000
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import time
# Values are 4 bytes each, so 2 * 1000 * 1000 * 32 * 4 = 256 MB = 2048 Mbit
MODEL_SIZE_VALUES = 2 * 1000 * 1000 * 32
BIT_PER_VALUE = 4 * 8
BITS_PER_MBIT = 1000 * 1000
def current_time_in_ms():
return int(round(time.time() * 1000))
def run(rank, size):
group = dist.new_group(list(range(size)))
tensor = torch.ones(MODEL_SIZE_VALUES, dtype=torch.float32)
print("Performing allreduce...")
print(" > Data to send: %d Mbit" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE) / float(BITS_PER_MBIT)))
start = current_time_in_ms()
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
elapsed_ms = current_time_in_ms() - start
print(" > Finished.")
print(" > Time: %.2f s" % (elapsed_ms / 1000.0))
print(" > Speed: %.2f Mbit/s" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE / BITS_PER_MBIT) / float(elapsed_ms / 1000.0)))
print(' > Result: Rank ', rank, ' has data ', str(tensor), '.\n')
def init_process(my_rank, size, master_address, master_port, fn, backend='gloo'):
# Initialize the distributed environment
os.environ['MASTER_ADDR'] = master_address
os.environ['MASTER_PORT'] = master_port
# Initialize process group
print("Initializing process group...")
dist.init_process_group(backend, rank=my_rank, world_size=size)
print(" > Initialized.")
print("")
fn(my_rank, size)
def main(my_rank, size, master_address, master_port):
p = Process(target=init_process, args=(my_rank, size, master_address, master_port, run))
p.start()
p.join()
if __name__ == "__main__":
args = sys.argv[1:]
if len(args) != 4:
print("Usage: python allreduce.py <my rank> <size> <master address> <master port>")
exit(1)
else:
main(int(args[0]), int(args[1]), str(args[2]), str(args[3]))
[3] Output of machine 1 and 2 with N=2:
Machine 1:
Initializing process group...
> Initialized.
Performing allreduce...
> Data to send: 2048 Mbit
> Finished.
> Time: 44.79 s
> Speed: 45.72 Mbit/s
> Result: Rank 0 has data tensor([2., 2., 2., ..., 2., 2., 2.]) .
Machine 2:
Initializing process group...
> Initialized.
Performing allreduce...
> Data to send: 2048 Mbit
> Finished.
> Time: 44.79 s
> Speed: 45.72 Mbit/s
> Result: Rank 1 has data tensor([2., 2., 2., ..., 2., 2., 2.]) .