if rank == 0:
print(f"[Rank {rank}] Putting data into queue.")
queue.put(torch.tensor([rank]))
else:
print(f"[Rank {rank}] Waiting for data from queue.")
data = queue.get()
print(f"[Rank {rank}] Received data: {data}")
barrier()
print(f"[Rank {rank}] Finished processing.")
if name == “main”:
mp.spawn(worker, args=(mp.SimpleQueue(),), nprocs=2)
When I run it using torchrun --standalone --nproc_per_node=2 test.py, I got the following error:
[W socket.cpp:464] [c10d] The server socket has failed to bind to [::]:12355 (errno: 98 - Address already in use). [W socket.cpp:464] [c10d] The server socket has failed to bind to 0.0.0.0:12355 (errno: 98 - Address already in use). [E socket.cpp:500] [c10d] The server socket has failed to listen on any local network address. W1225 10:46:25.163304 140356909672256 torch/multiprocessing/spawn.py:145] Terminating process 1532424 via signal SIGTERM
You should not be using torchrun when you use mp.spawn since torchrun will spawn multiple processes for you. But in your case you want to have the python script spawn the multiple processes and pass the queue object in and are creating the queue in the parent process. In torchrun you will not have access to a parent process.
The error that you are getting means some other process is using that particular port, so you should try another port.
You should not use a SimpleQueue and use Queue() instead. Here an updated snippet that works for me:
import torch.multiprocessing as mp
from torch.distributed import init_process_group, barrier
import os
import torch
def worker(rank, queue):
init_process_group(backend='nccl', rank=rank, world_size=2, device_id=torch.device(rank))
if rank == 0:
print(f"[Rank {rank}] Putting data into queue.")
queue.put(torch.tensor([rank]))
else:
print(f"[Rank {rank}] Waiting for data from queue.")
data = queue.get()
print(f"[Rank {rank}] Received data: {data}")
barrier()
print(f"[Rank {rank}] Finished processing.")
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
mp.spawn(worker, args=(mp.Manager().Queue(),), nprocs=2)
Many thanks! I have one more question. If I use torchrun instead of mp.spawn, is there any way I can make variables created in one process available in other ones? I asked this because the data is loaded into CPU memory multiples times when I use DDP in pytorch. I feel this can be a waste of time and CPU memory.
Different shards of the data should be loaded by multiple processes in DDP; that’s what allows you to train the model on multiple GPUs since the data is split and processed separately. If you need logic that is only performed on one of the ranks you could do:
import os
rank = int(os.environ["RANK"])
if rank == 0:
# custom logic
Thanks for explaining! In my code, the data is first loaded in CPU (a) and then transferred to GPU (b). If I use multiprocessing for DDP, e.g. 4 GPUs, both (a) and (b) are excuted 4 times because all the entire codes are parallelized. Am I right? I feel maybe there is some way to run (a) once and transfer the data in CPU memory to different devices. This may save some time because the data is only loaded to CPU memory once.