# Multi-GPU Calculations

I’m wanting to use pytorch for it’s tensor math and not necessarily for training a ML model. I have a machine that has 4 GPUs on them. The overall goal is to calculate the cosine similarity between pairs of embeddings utilizing all 4 GPU. Here is a walkthrough of what i have thus far:

Read the data in as a pandas dataframe

``````import pandas as pd
``````

There are 2 columns to `tmp_df`: embeds_a, and embeds_b. Each column contains 1 million rows making `tmp_df.shape = (100000,2)`. Each element in a row is a pytorch tensor that has shape `(300,1)`. I’ve already worked out the combinations of pairs and stored them appropriately in `tmp_df`. As such, `tmp_df['embeds_a'][i]` and `tmp_df['embeds_b'][i]` would constitute the ith pair of embeddings that would need to be run through a cosine similarity function.

My question is, given the format of the data and the 4 GPUs available, what is the best way to distribute the cosine similarity calculation across all 4 GPUs in parallel?

Any thoughts on this?

This sounds like a data parallel problem, although the nested structure seems like it might be an issue if you need to “unpack” PyTorch tensors from Python data structures.

For this case, you might want to consider preprocessing the data into a higher dimensional tensor, e.g., 100000,2,300.

From here, it becomes a straightforward data-parallel problem that shouldn’t take long on 4 GPUs (or likely even a single GPU) if you move the split data into equal sized parts, move the different parts to different GPUs, and aggregate the results.

At this point I’d be concerned that all of the preprocessing time (e.g., the steps to generate `tmp_df` and pack it into a large tensor) would be significantly greater than the calculation time for cosine similarity.

You could write a custom network (subclass of `torch.nn.Module`) whose `forward()` function computes the cosine similarity, and then use `DataParallel` or `DistributedDataParallel` to run your data through this network.

Note: This is just a hunch, I haven’t done something similar (well, except all the training I do is in fact quite similar …) so there may be pitfalls which I don’t see.

@gphilip Thanks for your response. I’ve been going off the example detailed here, but I have a few questions about testing this out. Here is what I have so far:

``````from torch import nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

class CosSimNetwork(nn.Module):
def __init__(self):
super(CosSimNetwork, self).__init__()

def forward(self, embeds):
#Here do the cosine similarity calculation
cos_sims = 12
return cos_sims

def calc_cos_sims(rank, world_size):
dist.init_process_group('gloo', rank=rank, world_size=world_size)
model = CosSimNetwork()
ddp_model = DDP(model, device_ids=[rank])

cos_sims = ddp_model(DATA.to(rank))

def main():
world_size = 4 #since I have 4 GPUs on a single machine
mp.spawn(calc_cos_sims,
args=(world_size,),
nprocs=world_size,
join=True)

if __name__ == 'main':
main()
``````

If I understand correctly, because I have 4 GPUs `world_size = 4`. However, I’m not entirely sure what `rank` should be given that I have 4 GPUs. Also, I have my data stored in a pickled pandas dataframe with two columns being the embeddings that would be used in the cosine similarity calculation. It looks something like `cos_sim(embed_a, embed_b)`. I’m not entirely sure how to read in the data properly for `DistributedDataParallel` and the `forward` method of `CosSimNetwork`. Any advice is much appreciated.

For `DistributedDataParallel`, you need to run one process per GPU and use ranks 0 - 3 for GPUs 0 - 3. I think your example code above should already do that automatically since `mp.spawn` would pass in the appropriate rank. For reading the data, each process can read a chunk of the data and process it. For example, maybe create 4 chunks of the `./embed_pairs_df.pkl` file or read the same file on all processes and each process only processes a part of the input. For example process 0 computes rows 0, 4, 8 and so on etc.

I have not used `DistributedDataParallel` yet, so I am not sure about the right way to use it.

I would suggest that you first write code that works on a single GPU, just to ensure that it works properly without the parallelization. You could then try using `DataParallel` (which involves adding just one extra line of code) to see if that speeds it up sufficiently for your needs. Maybe it will, and then you don’t have to worry about the more involved `DistributedDataParallel`.

@pritamdamania87 thank you very much for your reply. I believe I have implemented your suggestion, but if you don’t mind checking it for me to make sure it makes sense:

``````from torch import nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import pandas as pd

class CosSimNetwork(nn.Module):
def __init__(self):
super(CosSimNetwork, self).__init__()

def forward(self, embeds):
cos_sims = 12
return cos_sims

def determine_subranges(fullrange: tuple, num_subranges: int):
subranges = []
inc = fullrange // num_subranges
for i in range(fullrange, fullrange, inc):
subranges.append( (i, min(i+inc, fullrange)) )
return( subranges )

def calc_cos_sims(rank, world_size):
dist.init_process_group('gloo', rank=rank, world_size=world_size)
model = CosSimNetwork()
ddp_model = DDP(model, device_ids=[rank])
sub_ranges = determine_subranges((0,tmp_df.shape), world_size)
sub_range_tuple = sub_ranges[rank]
data = tmp_df.iloc[sub_range_tuple:sub_range_tuple]
cos_sims = ddp_model(data.to(rank))

def main():
world_size = 4 #since I have 4 GPUs on a single machine
mp.spawn(calc_cos_sims,
args=(world_size,),
nprocs=world_size,
join=True)

if __name__ == 'main':
main()
``````

If I understand this correctly, `mp.spawn()` will create 4 different processes (based off of `world_size`) that will run `calc_cos_sims()`. The input data is sliced into `world_size` chunks and the `rank` determines which chunk gets sent to which GPU. Once all the processes are finished, `mp.spawn()` will aggregate the results. Is my understanding correct?

A few other questions come to mind:

1. Is the entire dataset read in 4 times with the `tmp_df = pd.read_pickle('./embed_pairs_df_million.pkl')` line? If so, is there a more efficient way to read it in for this setup?

2. The `forward()` method returns the cosine similarity (or it will once I write it) between two embeddings. If `calc_cos_sims()` is copied to each process, would I need to replace the `mp.spawn()` line with `all_cos_sims = mp.spawn()` in order to store the results from all the GPUs?

Thanks for providing a code example. IIUC, you want to just compute a particular math function in parallel and not really train a model in data parallel fashion where you aggregate gradients across ranks in the backward pass? If so, you don’t really need to use DistributedDataParallel.

No, each process would have its results local to that process. You can collect all the results in one process using a collective operation like all_gather. See: Distributed communication package - torch.distributed — PyTorch 1.9.0 documentation

1. Is the entire dataset read in 4 times with the `tmp_df = pd.read_pickle('./embed_pairs_df_million.pkl')` line? If so, is there a more efficient way to read it in for this setup?

Yes, it is read 4 times. I’m not very familiar with pandas, but if you can pre-split the data before hand and read only what is required on each process then you would avoid reading the data 4 times.

1. The `forward()` method returns the cosine similarity (or it will once I write it) between two embeddings. If `calc_cos_sims()` is copied to each process, would I need to replace the `mp.spawn()` line with `all_cos_sims = mp.spawn()` in order to store the results from all the GPUs?

As mentioned above, `mp.spawn` will not aggregate results for you. You will have to use something like `all_gather` to do that.

@pritamdamania87 thank you for your explanations. They are really helping out!

Thanks for providing a code example. IIUC, you want to just compute a particular math function in parallel and not really train a model in data parallel fashion where you aggregate gradients across ranks in the backward pass? If so, you don’t really need to use DistributedDataParallel.

Yes this is right. I’m wanting to do a calculation in parallel and collect the results. Is there a better way to do this in pytorch?

Any further thoughts on doing generic calculations in pytorch using multiple gpu on a single machine? I’m not trying to train a model or anything. Just utilize pytorch to make these calculations across the multipe gpu available to me.

@aclifton314 You can perform generic calculations in pytorch using multiple gpus similar to the code example you provided. Basically spawn multiple processes where each process drives a single GPU and have each GPU do part of the computation. Then you can use PyTorch collective APIs to perform any aggregations across GPUs that you need.

@pritamdamania87 Here is what I now have:

``````import torch
import torch.multiprocessing as mp
import torch.distributed as dist
import torch.nn.functional as F
import pandas as pd

def calc_cos_sims(rank, world_size):
dist.init_process_group('gloo', rank=rank, world_size=world_size)
cuda_device = torch.device('cuda:'+str(rank))
data_path = './embed_pairs_df_million_part_' + str(rank) + '.pkl'

embeds_a_list = [embed_a for embed_a in tmp_df['embeds_a']]
embeds_b_list = [embed_b for embed_b in tmp_df['embeds_b']]

embeds_a_tensor = torch.tensor(embeds_a_list, device=cuda_device)
embeds_b_tensor = torch.tensor(embeds_b_list, device=cuda_device)
cosine_tensor = F.cosine_similarity(embeds_a_tensor, embeds_b_tensor)

def main():
world_size = 4 #since I have 4 GPUs on a single machine
mp.spawn(calc_cos_sims,
args=(world_size,),
nprocs=world_size,
join=True)

if __name__ == 'main':
main()
``````

This has changed to assume that the data has been split up into 4 different parts (1 part for each GPU) by `data_path = './embed_pairs_df_million_part_' + str(rank) + '.pkl'`. Since each entry in a column is a numpy array, I went ahead and converted everything to pytorch tensors. `cosine_tensor` is the cosine similarity between each element of the data split. I read the link you posted about aggregating, but I’m not entirely sure how to implement it. How would that be done in this case?

Also, have I called `mp.spawn` correctly? I have `args=(world_size,)` but `calc_cos_sims(rank, world_size)` has the rank first followed by the world size and those go into `dist.init_process_group('gloo', rank=rank, world_size=world_size)`.

@pritamdamania87
One additional question. I read through the collective APIs you linked to, but I don’t quite understand them and can’t figure out which one would be applicable in this case.

Any further ideas on this?

Yes this is correct.

I read the link you posted about aggregating, but I’m not entirely sure how to implement it. How would that be done in this case?

It really depends on the type of aggregation you want to do across `cosine_tensor`. You basically have a total of 4 `cosine_tensor` (one on each rank) and you want to aggregate them. For example if you want to sum them all up and get the total sum on each rank you can do this:

``````dist.all_reduce(cosine_tensor)
# Now cosine_tensor would be overwritten to contain the sum of all the 4 `cosine_tensor`.
``````

This tutorial will probably give a much better overview of how to use all of these collective APIs: Writing Distributed Applications with PyTorch — PyTorch Tutorials 1.9.1+cu102 documentation

@pritamdamania87 Thank you for your reply, it was very helpful. Here is the code I now have:

``````import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn.functional as F
import pandas as pd

def calc_cos_sims(rank, world_size):
group = dist.new_group([0, 2])
cuda_device = torch.device('cuda:'+str(rank))

data_path = './embed_pairs_df_8000_part' + str(rank) + '.pkl'
embeds_a_list = [embed_a for embed_a in tmp_df['embeds_a']]
embeds_b_list = [embed_b for embed_b in tmp_df['embeds_b']]

embeds_a_tensor = torch.tensor(embeds_a_list, device=cuda_device)
embeds_b_tensor = torch.tensor(embeds_b_list, device=cuda_device)

cosine_tensor = F.cosine_similarity(embeds_a_tensor, embeds_b_tensor)
cosine_tensors_concat = dist.gather(cosine_tensor, group=group)

def init_process(rank, size, fn, backend='gloo'):
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)

def main():
world_size = 2
processes = []
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, calc_cos_sims))
p.start()
processes.append(p)

for p in processes:
p.join()

if __name__ == '__main__':
main()
print('DONE!')
``````

However, I am getting the following error:

``````Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
fn(rank, size)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
group = dist.new_group([0, 2])
File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
raise RuntimeError("The new group's rank should be within the "
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
RuntimeError: The new group's rank should be within the the world_size set by init_process_group
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
fn(rank, size)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
group = dist.new_group([0, 2])
File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
raise RuntimeError("The new group's rank should be within the "
RuntimeError: The new group's rank should be within the the world_size set by init_process_group
``````

Also, how would I go about assigning the gathered tensor to a variable for further processing inside the `main()` function?

What is the reason you are using `new_group`? New group should be used only if you want to use the subset of processes. Otherwise you can just do `dist.gather(cosine_tensor)` with the default group size of the world_size.

The problem in your code is that world_size is 2 and you specify [0, 2] to `new_group`. `new_group` should contain a list of ranks between [0, world_size - 1]. So [0, 1],  or  are valid arguments. However, if you want to just use [0, 1] that is actually the entire world_size and you don’t really need `new_group` API for that and can just use the default group.

You can create a pipe in the main process and pass it to the child processes. Then the child processes can write the `cosine_tensor` to the pipe which the main process can read and process. You can read these docs to better understand how to use a pipe for communication: multiprocessing — Process-based parallelism — Python 3.10.0 documentation