On windows, It works with single GPU, but not work for Two or more GPU with following setting.
- init_method= init_method=r"file://D:/torch-dist-train/test.log"
- init_method=r"tcp://10.4.220.11:1000"
Can anyone help this?
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
def run_ddp(rank, world_size):
# create local model
model = nn.Linear(10, 10).to(rank)
#.to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))#.to(rank)
labels = torch.randn(20, 10).to(rank)#.to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
def all_reduce(rank, size):
""" Distributed function to be implemented later. """
group = dist.new_group([0, 1, 2])
tensor = torch.ones(3)
print('{} : Before allreducce: Rank {} has data {}'\
.format(os.getpid(), rank,tensor))
dist.all_reduce(tensor, op = dist.ReduceOp.SUM, group= group)
print('{} : After allreduce: Rank {} has data {}'\
.format(os.getpid(), rank,tensor))
def broadcast(rank, size):
group = dist.new_group([0,1])
if rank == 0:
tensor = torch.zeros(3)
else:
tensor = torch.ones(3)
print('{} : Before braodcasting: Rank {} has data {}'\
.format(os.getpid(), rank,tensor))
dist.broadcast(tensor, src = 0, group= group)
print('{} : After braodcasting: Rank {} has data {}'\
.format(os.getpid(), rank,tensor))
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
print(123)
# dist.init_process_group(backend, init_method=r"file://D:/torch-dist-train/test.log", rank=rank, world_size=size)
dist.init_process_group(backend, init_method=r"tcp://10.4.220.11:1000", rank=rank, world_size=size)
print(f"rank:{rank}")
fn(rank, size)
def test():
size = 1
processes = []
for rank in range(size):
p = Process(target=init_process, args=(rank, size, run_ddp))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
import os
# os.environ['GLOO_SOCKET_IFNAME'] = '10.4.220.11'
print(dist.is_available())
test()