@pritamdamania87 Thank you for your reply, it was very helpful. Here is the code I now have:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn.functional as F
import pandas as pd
def calc_cos_sims(rank, world_size):
group = dist.new_group([0, 2])
cuda_device = torch.device('cuda:'+str(rank))
data_path = './embed_pairs_df_8000_part' + str(rank) + '.pkl'
tmp_df = pd.read_pickle(data_path)
embeds_a_list = [embed_a for embed_a in tmp_df['embeds_a']]
embeds_b_list = [embed_b for embed_b in tmp_df['embeds_b']]
embeds_a_tensor = torch.tensor(embeds_a_list, device=cuda_device)
embeds_b_tensor = torch.tensor(embeds_b_list, device=cuda_device)
cosine_tensor = F.cosine_similarity(embeds_a_tensor, embeds_b_tensor)
cosine_tensors_concat = dist.gather(cosine_tensor, group=group)
def init_process(rank, size, fn, backend='gloo'):
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
def main():
world_size = 2
processes = []
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, calc_cos_sims))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == '__main__':
main()
print('DONE!')
However, I am getting the following error:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
fn(rank, size)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
group = dist.new_group([0, 2])
File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
raise RuntimeError("The new group's rank should be within the "
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
RuntimeError: The new group's rank should be within the the world_size set by init_process_group
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 46, in init_process
fn(rank, size)
File "/home/aclifton/jca-ai/classification_embeds/pytorch_example_multi_proc.py", line 14, in calc_cos_sims
group = dist.new_group([0, 2])
File "/home/aclifton/.local/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2682, in new_group
raise RuntimeError("The new group's rank should be within the "
RuntimeError: The new group's rank should be within the the world_size set by init_process_group
Also, how would I go about assigning the gathered tensor to a variable for further processing inside the main()
function?