Hi Masters,
I am trying the following code on 2 nodes with diff num of CPU/GPU devices, running one parameter server (ps) process and diff num of worker process on each node.(e.g. global_ranks:[[0(ps),2(worker),3(worker)],[1(ps),4(worker)]])
For CUDA init reasons, I turned mp.set_start_method('spawn', force=True)
on slave node and leads to the following crash:(NOT warning)
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
Could somebody help? Thanks in advance.
Results:
$Slave Node
<Process(Process-1, started)> is started...
<Process(Process-2, started)> is started...
0 test_run_worker() on global_rank: 4 ,global_c: 1 ,counter_list: [tensor([1.]), tensor([0.]), tensor([1.])]
0 test_run_ps() on global_rank: 1 , global_c= 0 ,counter_list: [tensor([1.]), tensor([0.]), tensor([1.])]
1 test_run_worker() on global_rank: 4 ,global_c: 4 ,counter_list: [tensor([2.]), tensor([1.]), tensor([2.])]
2 test_run_worker() on global_rank: 4 ,global_c: 6 ,counter_list: [tensor([2.]), tensor([1.]), tensor([3.])]
1 test_run_ps() on global_rank: 1 , global_c= 3 ,counter_list: [tensor([2.]), tensor([1.]), tensor([2.])]
3 test_run_worker() on global_rank: 4 ,global_c: 9 ,counter_list: [tensor([3.]), tensor([2.]), tensor([4.])]
4 test_run_worker() on global_rank: 4 ,global_c: 10 ,counter_list: [tensor([3.]), tensor([2.]), tensor([5.])]
2 test_run_ps() on global_rank: 1 , global_c= 6 ,counter_list: [tensor([3.]), tensor([2.]), tensor([3.])]
3 test_run_ps() on global_rank: 1 , global_c= 9 ,counter_list: [tensor([4.]), tensor([3.]), tensor([4.])]
4 test_run_ps() on global_rank: 1 , global_c= 12 ,counter_list: [tensor([5.]), tensor([4.]), tensor([5.])]
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))
Some Codes:
#Master Node:
#GLOO_SOCKET_IFNAME=enp0s31f6 python -m torch.distributed.launch torch_dist_test2.py --local_rank=0
#Slave Node:
#GLOO_SOCKET_IFNAME=enp7s0 python -m torch.distributed.launch torch_dist_test2.py --local_rank=1
def init_dist_multi_process(world_size, global_rank, backend='gloo'):#'nccl'
dist.init_process_group(backend=backend,
init_method='tcp://192.168.1.12:23457',
world_size=world_size,
rank=global_rank)
def dist_broadcast(src, dic=None, tensor=None, async_op=True, p_device=torch.device('cpu')):
if not dic == None:
for key, value in dic.items():
dist.broadcast(tensor=torch.Tensor(value).to(p_device), src=src, async_op=async_op)
else:
dist.broadcast(tensor=tensor.to(p_device), src=src, async_op=async_op)
def test_run_ps(shared_coms):
init_dist_multi_process(world_size=shared_coms.world_size, global_rank=shared_coms.server_rank, backend='gloo')
counter_list = [torch.Tensor([0]) for _ in shared_coms.global_worker_rank_flatten_list]
for _ in range(5):
time.sleep(0.5)
for r, gr in enumerate(shared_coms.global_worker_rank_flatten_list):##
dist_broadcast(src=gr,tensor=counter_list[r])
global_c = sum([int(x) for x in counter_list])
print(_,'test_run_ps() on global_rank:',shared_coms.server_rank,', global_c=',global_c,',counter_list:',counter_list)
print('test_run_ps() time up')
time.sleep(5)
def test_run_worker(shared_coms, device_r):
init_dist_multi_process(world_size=shared_coms.world_size, global_rank=shared_coms.worker_rank_list[device_r], backend='gloo')
c = 0
counter_list = [torch.Tensor([0]) for _ in shared_coms.global_worker_rank_flatten_list]
for _ in range(5):
time.sleep(0.25*(1+device_r))
c+=1
i=0
for r, gr in enumerate(shared_coms.global_worker_rank_flatten_list):##
if gr == shared_coms.global_worker_rank_list[shared_coms.server_rank][device_r]:
counter_list[r] = torch.Tensor([c])
dist_broadcast(src=gr,tensor=counter_list[r])
global_c = sum([int(x) for x in counter_list])
print(_,'test_run_worker() on global_rank:',shared_coms.worker_rank_list[device_r],',global_c:',global_c,',counter_list:',counter_list)
print('test_run_worker() time up')
time.sleep(5)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--global_rank_list", type=list, default=[[0,2,3],[1,4]])
parser.add_argument("--n_worker", type=int, default=16)
args = parser.parse_args()
server_rank = args.local_rank
global_rank_list = args.global_rank_list
world_size = sum([1 for y in global_rank_list for x in y])
n_proc = len(global_rank_list[server_rank])-1
global_n_proc = world_size - len(global_rank_list)
n_server = len(global_rank_list)
worker_rank_list = global_rank_list[server_rank][1:]
global_worker_rank_list = [global_rank_list[x][1:] for x in range(len(global_rank_list))]
global_worker_rank_flatten_list = []
for x in global_worker_rank_list: global_worker_rank_flatten_list+=x
n_workers_per_slave = args.n_worker
game = 'BreakoutNoFrameskip-v4'
process_list = []
shared_coms = SharedComponents(game, server_rank, global_rank_list, p_device=torch.device('cuda'))
mp.set_start_method('spawn', force=True)
p = mp.Process(target=test_run_ps, args=(shared_coms, ))#, args=(None))
process_list.append(p)
for device_r in range(n_proc):##
p = mp.Process(target=test_run_worker, args=(shared_coms, device_r))
process_list.append(p)
for p in process_list:
p.start()
print(p,' is started...')
for p in process_list:
p.join()