@ptrblck sorry for calling you out like this…but I was wondering, is there an explanation of what the error:
[W ProcessGroupGloo.cpp:558] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())
means?
Just in case you need it here is a self contained code causing the error:
import os
from typing import Tuple, List
import torch
from torch import nn, optim, Tensor
from torch.distributed import rpc
import torch.distributed.autograd as dist_autograd
from torch.distributed.optim import DistributedOptimizer
from torch.multiprocessing import Pool, Process
import torch.multiprocessing as mp
world_size = 2 # three chunks, one for each process
num_epochs = 1 # this doesn't really matter, we only need to test if it can process a big batch and a small batch
iters = 5 # iters for 1 epoch
iters = 5
Din, Dout = 10, 5
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.lin = nn.Linear(Din, Dout)
self.criterion = nn.MSELoss()
def forward(self, batch_x, batch_y):
y_ = self.lin(batch_x)
loss = self.criterion(batch_y, y_)
return loss
def get_ast_batch(batch_size: int) -> List[Tuple[Tensor]]:
"""
Returns a list of size batch_size with each individual example.
- 1 example in a batch is a task with K examples with dim=D.
Note:
num_proc = 3
batch_size = 8 # chunk_size = 2 rem 1 (have three chunks of size 2 and one of size 1) 8/3 = 2.666 = 2 rem 1
batch_size = 2 # chunk_size = 1 for each process since 2/3 <= 1
:return:
"""
data_x, data_y = torch.randn(Din), torch.randn(Dout)
batch = [(data_x, data_y) for _ in range(batch_size)]
return batch
def get_meta_batch(batch_size: int, k=15) -> Tuple[Tensor]:
"""
Returns Tuple(torch.Tensor([B, K, D])) where each element in the batch is a task.
- 1 example in a batch is a task with K examples with dim=D.
:return:
"""
data_x, data_y = torch.randn(batch_size, k, Din), torch.randn(batch_size, k, Dout)
batch = data_x, data_y
return batch
class Worker:
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
self.id = rpc.get_worker_info().id
# self.env.seed(args.seed)
class Master:
def __init__(self, world_size):
self.world_size = world_size
self.master_rank = 0
self.num_workers = world_size - 1
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
rpc.init_rpc(f'worker{self.master_rank}', rank=self.master_rank, world_size=world_size)
self.model = ToyModel()
self.optimizer = optim.Adam(self.model.parameters(), lr=1e-2)
self.saved_losses = {}
# create rrefs (remote referneces) for calling rpc calls
self.workers = []
for worker_rank in range(1, world_size):
worker_info = rpc.get_worker_info(worker_name=f'worker{worker_rank}')
# Make a remote call to run func on worker to and return an RRef to the result value immediately.
# worker_rrf = rpc.remote(to=worker_info, func=Worker, args='args', kwargs='kwargs')
self.workers.append(worker_info)
#
self.saved_losses[worker_info.id] = []
def forward_parallel(self, batch):
"""
:param batch:
- List[Tensor([B, K, D])]
- List[Tuple[Tensor(D), Tensor(D)]]
"""
# batch_size = len(batch)
batch_size = batch.size(0) # num_tasks
Sx, Sy = batch
chunk_size = batch_size // self.num_workers # the number of examples to give each worker/proc
futures = []
if chunk_size <= 0:
# give each worker a data point and thats it
for t in range(batch_size):
worker_info = self.workers[t]
# makes non blocking rpc call on worker to and immediately returns a future object to wait the result
future = rpc.rpc_async(to=worker_info, func=self.model.forward, args=(Sx[t], Sy[t]))
futures.append(future)
else:
# each worker receives a chunk of size chunk_size
chunk_idx = 0
for worker_idx in range(self.num_workers):
chunk_x = Sx[chunk_idx:chunk_idx+chunk_size]
chunk_y = Sy[chunk_idx:chunk_idx+chunk_size]
chunk_idx += chunk_size
worker_info = self.workers[worker_idx]
future = rpc.rpc_async(to=worker_info, func=self.model.forward, args=(chunk_x, chunk_y))
futures.append(future)
loss = 0
for future in futures:
loss = future.wait()
loss += (1.0/batch_size)*loss
return loss
def finish(self):
def end_master():
rpc.shutdown()
master_proc = Process(target=end_master, args=())
master_proc.start()
def run_worker_process(rank, world_size):
print('-- Worker run -- ')
print(f'current process: {mp.current_process()}')
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
# other ranks are the workers (1 to world_size)
rpc.init_rpc(f'worker{rank}', rank=rank, world_size=world_size)
# block until all rpcs finish, and shutdown the RPC instance
print('running shutdown')
rpc.shutdown()
print(f'shutdown complete rank {rank}')
print(f'shutdown complete for process: {mp.current_process()}')
def master(world_size):
print('-- Master run -- ')
print(f'current process: {mp.current_process()}')
# -- init master process --
master = Master(world_size=world_size)
# -- do training loop --
optimizer = optim.Adam(params=master.model.parameters(), lr=0.1)
# optimizer = DistributedOptimizer(optim.Adam, )
for epoch in range(num_epochs):
for batch_idx in range(iters):
batch = get_ast_batch(batch_size=batch_idx)
with dist_autograd.context() as context_id:
loss = master.forward_parallel(batch)
dist_autograd.backward(context_id, loss)
optimizer.step()
optimizer.zero_grad()
# block until all rpcs finish, and shutdown the RPC instance, once master gets here everything is shut down
# rpc.shutdown()
master.finish()
if __name__ == '__main__':
print('starting __main__')
for rank in range(1, world_size):
worker_proc = Process(target=run_worker_process, args=(rank, world_size))
print(f'creating process object serially: pro_obj is = {worker_proc}')
# worker_proc.start()
master(world_size=2)
print('Done!\a\n')
Solved it by making sure each of my rpc initialized things have a different port number. I guess initializing an rpc vs a distributed group is different…?
related: python - How does one set the pytorch distributed hostname, port and GLOO_SOCKET_IFNAME so that DDP works? - Stack Overflow