Distributed.all_reduce bandwidth expectations

I want to benchmark how quickly PyTorch with the Gloo backend is able to all-reduce all-gather a model synchronously. To do so, I’ve written the following script [2] working with the latest Gloo backend / PyTorch. I start it on N machines, and then they together all-reduce it fine. However, the bandwidth that I see, irrespective of N, is 0.5 * link_bandwidth. The N machines are all connected to a 100 Mbps per-port switch. This is expected with a large N, as the documentation does state that it uses a ring all-reduce/all-gather to perform the distributed.all_reduce behind that scenes.

However, for i.e. N=2 I would expect it to perform the all-reduce at 1.0 * link_bandwidth, as one node only needs to send to one other node its full model. My experiments show however 0.5 * link_bandwidth [3]. I would expect the bandwidth of the all-reduce for any N to be [1]:

Amount of data to send in all-reduce: (N - 1) / N
Amount of data to send in all-gather: (N - 1) / N
Total data for each node to send: 2 * (N - 1) / N
I.e., at N=2 we only need to send 2 * 1/2 = 1x model, and at N->inf we have to send 2x model. Translating into a all-reduce/all-gather bandwidth of 1.0 * link_bandwidth (N=2) and 0.5 * link_bandwidth (N->inf).

I am not sure where my calculation is wrong, or where I am misunderstanding the all-reduce ring method.

[1] https://github.com/zhangruiskyline/DeepLearning/blob/master/doc/system.md#allreduce-in-practice

[2] allreduce.py – execute for N=2 on each of the two machines: python allreduce.py 0 2 192.168.0.1 10000 and python allreduce.py 1 2 192.168.0.1 10000

#!/usr/bin/env python

import os
import sys
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import time

# Values are 4 bytes each, so 2 * 1000 * 1000 * 32 * 4 = 256 MB = 2048 Mbit
MODEL_SIZE_VALUES = 2 * 1000 * 1000 * 32
BIT_PER_VALUE = 4 * 8
BITS_PER_MBIT = 1000 * 1000


def current_time_in_ms():
    return int(round(time.time() * 1000))


def run(rank, size):
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(MODEL_SIZE_VALUES, dtype=torch.float32)
    print("Performing allreduce...")
    print("   > Data to send: %d Mbit" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE) / float(BITS_PER_MBIT)))
    start = current_time_in_ms()
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    elapsed_ms = current_time_in_ms() - start
    print("   > Finished.")
    print("   > Time: %.2f s" % (elapsed_ms / 1000.0))
    print("   > Speed: %.2f Mbit/s" % ((MODEL_SIZE_VALUES * BIT_PER_VALUE / BITS_PER_MBIT) / float(elapsed_ms / 1000.0)))
    print('   > Result: Rank ', rank, ' has data ', str(tensor), '.\n')


def init_process(my_rank, size, master_address, master_port, fn, backend='gloo'):
    # Initialize the distributed environment
    os.environ['MASTER_ADDR'] = master_address
    os.environ['MASTER_PORT'] = master_port

    # Initialize process group
    print("Initializing process group...")
    dist.init_process_group(backend, rank=my_rank, world_size=size)
    print("   > Initialized.")
    print("")
    fn(my_rank, size)


def main(my_rank, size, master_address, master_port):
    p = Process(target=init_process, args=(my_rank, size, master_address, master_port, run))
    p.start()
    p.join()


if __name__ == "__main__":
    args = sys.argv[1:]
    if len(args) != 4:
        print("Usage: python allreduce.py <my rank> <size> <master address> <master port>")
        exit(1)
    else:
        main(int(args[0]), int(args[1]), str(args[2]), str(args[3]))

[3] Output of machine 1 and 2 with N=2:

Machine 1:

Initializing process group...
   > Initialized.

Performing allreduce...
   > Data to send: 2048 Mbit
   > Finished.
   > Time: 44.79 s
   > Speed: 45.72 Mbit/s
   > Result: Rank  0  has data  tensor([2., 2., 2.,  ..., 2., 2., 2.]) .

Machine 2:

Initializing process group...
   > Initialized.

Performing allreduce...
   > Data to send: 2048 Mbit
   > Finished.
   > Time: 44.79 s
   > Speed: 45.72 Mbit/s
   > Result: Rank  1  has data  tensor([2., 2., 2.,  ..., 2., 2., 2.]) .

Thanks for posting the question. Your analysis is correct, it should be equal to link bandwidth for N=2 (provided the input dimensions are large enough for the runtime to be dominated by the bandwidth). The implementation in Gloo ensures there is always 1 chunk in flight and 1 chunk being reduced, so it is possible there is something wrong there for the N=2 case.

I’ll investigate and post back here.

edit: I added https://github.com/facebookincubator/gloo/issues/169 to not lose track of it.

I got around to running a test with the Gloo benchmark tool and confirmed the issue.

For a larger explanation of the issue and the fix, see https://github.com/facebookincubator/gloo/pull/192.

Once this is merged and bumped in PyTorch, I expect you’ll be able to run the same test and find the bandwidth to be very close to link speed.