Multi-GPU Calculations

I’m wanting to use pytorch for it’s tensor math and not necessarily for training a ML model. I have a machine that has 4 GPUs on them. The overall goal is to calculate the cosine similarity between pairs of embeddings utilizing all 4 GPU. Here is a walkthrough of what i have thus far:

Read the data in as a pandas dataframe

import pandas as pd
tmp_df = pd.read_pickle('./embed_pairs_df.pkl')

There are 2 columns to tmp_df: embeds_a, and embeds_b. Each column contains 1 million rows making tmp_df.shape = (100000,2). Each element in a row is a pytorch tensor that has shape (300,1). I’ve already worked out the combinations of pairs and stored them appropriately in tmp_df. As such, tmp_df['embeds_a'][i] and tmp_df['embeds_b'][i] would constitute the ith pair of embeddings that would need to be run through a cosine similarity function.

My question is, given the format of the data and the 4 GPUs available, what is the best way to distribute the cosine similarity calculation across all 4 GPUs in parallel?

Any thoughts on this?

This sounds like a data parallel problem, although the nested structure seems like it might be an issue if you need to “unpack” PyTorch tensors from Python data structures.

For this case, you might want to consider preprocessing the data into a higher dimensional tensor, e.g., 100000,2,300.

From here, it becomes a straightforward data-parallel problem that shouldn’t take long on 4 GPUs (or likely even a single GPU) if you move the split data into equal sized parts, move the different parts to different GPUs, and aggregate the results.

At this point I’d be concerned that all of the preprocessing time (e.g., the steps to generate tmp_df and pack it into a large tensor) would be significantly greater than the calculation time for cosine similarity.

You could write a custom network (subclass of torch.nn.Module) whose forward() function computes the cosine similarity, and then use DataParallel or DistributedDataParallel to run your data through this network.

Note: This is just a hunch, I haven’t done something similar (well, except all the training I do is in fact quite similar …) so there may be pitfalls which I don’t see.

@gphilip Thanks for your response. I’ve been going off the example detailed here, but I have a few questions about testing this out. Here is what I have so far:

from torch import nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP


class CosSimNetwork(nn.Module):
    def __init__(self):
        super(CosSimNetwork, self).__init__()

    def forward(self, embeds):
        #Here do the cosine similarity calculation
        cos_sims = 12
        return cos_sims

def calc_cos_sims(rank, world_size):
    dist.init_process_group('gloo', rank=rank, world_size=world_size)
    model = CosSimNetwork()
    ddp_model = DDP(model, device_ids=[rank])

    cos_sims = ddp_model(DATA.to(rank))


def main():
    world_size = 4 #since I have 4 GPUs on a single machine
    mp.spawn(calc_cos_sims,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == 'main':
    main()

If I understand correctly, because I have 4 GPUs world_size = 4. However, I’m not entirely sure what rank should be given that I have 4 GPUs. Also, I have my data stored in a pickled pandas dataframe with two columns being the embeddings that would be used in the cosine similarity calculation. It looks something like cos_sim(embed_a, embed_b). I’m not entirely sure how to read in the data properly for DistributedDataParallel and the forward method of CosSimNetwork. Any advice is much appreciated.

For DistributedDataParallel, you need to run one process per GPU and use ranks 0 - 3 for GPUs 0 - 3. I think your example code above should already do that automatically since mp.spawn would pass in the appropriate rank. For reading the data, each process can read a chunk of the data and process it. For example, maybe create 4 chunks of the ./embed_pairs_df.pkl file or read the same file on all processes and each process only processes a part of the input. For example process 0 computes rows 0, 4, 8 and so on etc.

I have not used DistributedDataParallel yet, so I am not sure about the right way to use it.

I would suggest that you first write code that works on a single GPU, just to ensure that it works properly without the parallelization. You could then try using DataParallel (which involves adding just one extra line of code) to see if that speeds it up sufficiently for your needs. Maybe it will, and then you don’t have to worry about the more involved DistributedDataParallel.

@pritamdamania87 thank you very much for your reply. I believe I have implemented your suggestion, but if you don’t mind checking it for me to make sure it makes sense:

from torch import nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import pandas as pd

class CosSimNetwork(nn.Module):
    def __init__(self):
        super(CosSimNetwork, self).__init__()

    def forward(self, embeds):
        cos_sims = 12
        return cos_sims

def determine_subranges(fullrange: tuple, num_subranges: int):
    subranges = []
    inc = fullrange[1] // num_subranges
    for i in range(fullrange[0], fullrange[1], inc):
        subranges.append( (i, min(i+inc, fullrange[1])) )
    return( subranges )


def calc_cos_sims(rank, world_size):
    dist.init_process_group('gloo', rank=rank, world_size=world_size)
    model = CosSimNetwork()
    ddp_model = DDP(model, device_ids=[rank])
    tmp_df = pd.read_pickle('./embed_pairs_df_million.pkl')
    sub_ranges = determine_subranges((0,tmp_df.shape[0]), world_size)
    sub_range_tuple = sub_ranges[rank]
    data = tmp_df.iloc[sub_range_tuple[0]:sub_range_tuple[1]]
    cos_sims = ddp_model(data.to(rank))


def main():
    world_size = 4 #since I have 4 GPUs on a single machine
    mp.spawn(calc_cos_sims,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == 'main':
    main()

If I understand this correctly, mp.spawn() will create 4 different processes (based off of world_size) that will run calc_cos_sims(). The input data is sliced into world_size chunks and the rank determines which chunk gets sent to which GPU. Once all the processes are finished, mp.spawn() will aggregate the results. Is my understanding correct?

A few other questions come to mind:

  1. Is the entire dataset read in 4 times with the tmp_df = pd.read_pickle('./embed_pairs_df_million.pkl') line? If so, is there a more efficient way to read it in for this setup?

  2. The forward() method returns the cosine similarity (or it will once I write it) between two embeddings. If calc_cos_sims() is copied to each process, would I need to replace the mp.spawn() line with all_cos_sims = mp.spawn() in order to store the results from all the GPUs?

Thanks in advance for your help!

Thanks for providing a code example. IIUC, you want to just compute a particular math function in parallel and not really train a model in data parallel fashion where you aggregate gradients across ranks in the backward pass? If so, you don’t really need to use DistributedDataParallel.

No, each process would have its results local to that process. You can collect all the results in one process using a collective operation like all_gather. See: Distributed communication package - torch.distributed — PyTorch 2.1 documentation

  1. Is the entire dataset read in 4 times with the tmp_df = pd.read_pickle('./embed_pairs_df_million.pkl') line? If so, is there a more efficient way to read it in for this setup?

Yes, it is read 4 times. I’m not very familiar with pandas, but if you can pre-split the data before hand and read only what is required on each process then you would avoid reading the data 4 times.

  1. The forward() method returns the cosine similarity (or it will once I write it) between two embeddings. If calc_cos_sims() is copied to each process, would I need to replace the mp.spawn() line with all_cos_sims = mp.spawn() in order to store the results from all the GPUs?

As mentioned above, mp.spawn will not aggregate results for you. You will have to use something like all_gather to do that.

@pritamdamania87 thank you for your explanations. They are really helping out!

Thanks for providing a code example. IIUC, you want to just compute a particular math function in parallel and not really train a model in data parallel fashion where you aggregate gradients across ranks in the backward pass? If so, you don’t really need to use DistributedDataParallel.

Yes this is right. I’m wanting to do a calculation in parallel and collect the results. Is there a better way to do this in pytorch?

Any further thoughts on doing generic calculations in pytorch using multiple gpu on a single machine? I’m not trying to train a model or anything. Just utilize pytorch to make these calculations across the multipe gpu available to me.

@aclifton314 You can perform generic calculations in pytorch using multiple gpus similar to the code example you provided. Basically spawn multiple processes where each process drives a single GPU and have each GPU do part of the computation. Then you can use PyTorch collective APIs to perform any aggregations across GPUs that you need.

@pritamdamania87 Here is what I now have:

import torch
import torch.multiprocessing as mp
import torch.distributed as dist
import torch.nn.functional as F
import pandas as pd


def calc_cos_sims(rank, world_size):
    dist.init_process_group('gloo', rank=rank, world_size=world_size)
    cuda_device = torch.device('cuda:'+str(rank))
    data_path = './embed_pairs_df_million_part_' + str(rank) + '.pkl'
    tmp_df = pd.read_pickle(data_path)

    embeds_a_list = [embed_a for embed_a in tmp_df['embeds_a']]
    embeds_b_list = [embed_b for embed_b in tmp_df['embeds_b']]

    embeds_a_tensor = torch.tensor(embeds_a_list, device=cuda_device)
    embeds_b_tensor = torch.tensor(embeds_b_list, device=cuda_device)
    cosine_tensor = F.cosine_similarity(embeds_a_tensor, embeds_b_tensor)
    


def main():
    world_size = 4 #since I have 4 GPUs on a single machine
    mp.spawn(calc_cos_sims,
             args=(world_size,),
             nprocs=world_size,
             join=True)

if __name__ == 'main':
    main()

This has changed to assume that the data has been split up into 4 different parts (1 part for each GPU) by data_path = './embed_pairs_df_million_part_' + str(rank) + '.pkl'. Since each entry in a column is a numpy array, I went ahead and converted everything to pytorch tensors. cosine_tensor is the cosine similarity between each element of the data split. I read the link you posted about aggregating, but I’m not entirely sure how to implement it. How would that be done in this case?

Also, have I called mp.spawn correctly? I have args=(world_size,) but calc_cos_sims(rank, world_size) has the rank first followed by the world size and those go into dist.init_process_group('gloo', rank=rank, world_size=world_size).

@pritamdamania87
One additional question. I read through the collective APIs you linked to, but I don’t quite understand them and can’t figure out which one would be applicable in this case.

Any further ideas on this?

Sorry about the delay here.

Yes this is correct.

I read the link you posted about aggregating, but I’m not entirely sure how to implement it. How would that be done in this case?

It really depends on the type of aggregation you want to do across cosine_tensor. You basically have a total of 4 cosine_tensor (one on each rank) and you want to aggregate them. For example if you want to sum them all up and get the total sum on each rank you can do this:

dist.all_reduce(cosine_tensor)
# Now cosine_tensor would be overwritten to contain the sum of all the 4 `cosine_tensor`.

This tutorial will probably give a much better overview of how to use all of these collective APIs: Writing Distributed Applications with PyTorch — PyTorch Tutorials 2.1.1+cu121 documentation

@pritamdamania87 Thank you for your reply, it was very helpful. Here is the code I now have:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn.functional as F
import pandas as pd


def calc_cos_sims(rank, world_size):
    group = dist.new_group([0, 2])
    cuda_device = torch.device('cuda:'+str(rank))

    data_path = './embed_pairs_df_8000_part' + str(rank) + '.pkl'
    tmp_df = pd.read_pickle(data_path)
    embeds_a_list = [embed_a for embed_a in tmp_df['embeds_a']]
    embeds_b_list = [embed_b for embed_b in tmp_df['embeds_b']]

    embeds_a_tensor = torch.tensor(embeds_a_list, device=cuda_device)
    embeds_b_tensor = torch.tensor(embeds_b_list, device=cuda_device)

    cosine_tensor = F.cosine_similarity(embeds_a_tensor, embeds_b_tensor)
    cosine_tensors_concat = dist.gather(cosine_tensor, group=group)


def init_process(rank, size, fn, backend='gloo'):
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


def main():
    world_size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, calc_cos_sims))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()



if __name__ == '__main__':
    main()
    print('DONE!')

However, I am getting the following error:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
    fn(rank, size)
  File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
    group = dist.new_group([0, 2])
  File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
    raise RuntimeError("The new group's rank should be within the "
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
RuntimeError: The new group's rank should be within the the world_size set by init_process_group
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
    fn(rank, size)
  File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
    group = dist.new_group([0, 2])
  File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
    raise RuntimeError("The new group's rank should be within the "
RuntimeError: The new group's rank should be within the the world_size set by init_process_group

Also, how would I go about assigning the gathered tensor to a variable for further processing inside the main() function?

What is the reason you are using new_group? New group should be used only if you want to use the subset of processes. Otherwise you can just do dist.gather(cosine_tensor) with the default group size of the world_size.

The problem in your code is that world_size is 2 and you specify [0, 2] to new_group. new_group should contain a list of ranks between [0, world_size - 1]. So [0, 1], [1] or [0] are valid arguments. However, if you want to just use [0, 1] that is actually the entire world_size and you don’t really need new_group API for that and can just use the default group.

You can create a pipe in the main process and pass it to the child processes. Then the child processes can write the cosine_tensor to the pipe which the main process can read and process. You can read these docs to better understand how to use a pipe for communication: multiprocessing — Process-based parallelism — Python 3.12.0 documentation