Multiprocessing on distributed Multi-nodes shutdown error: 'spawn' on slave node leads to semaphore_tracker leaked

Hi Masters,

I am trying the following code on 2 nodes with diff num of CPU/GPU devices, running one parameter server (ps) process and diff num of worker process on each node.(e.g. global_ranks:[[0(ps),2(worker),3(worker)],[1(ps),4(worker)]])

For CUDA init reasons, I turned mp.set_start_method('spawn', force=True) on slave node and leads to the following crash:(NOT warning)
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
len(cache))

Could somebody help? Thanks in advance.

Results:

$Slave Node
<Process(Process-1, started)>  is started...
<Process(Process-2, started)>  is started...
0 test_run_worker() on global_rank: 4 ,global_c: 1 ,counter_list: [tensor([1.]), tensor([0.]), tensor([1.])]
0 test_run_ps() on global_rank: 1 , global_c= 0 ,counter_list: [tensor([1.]), tensor([0.]), tensor([1.])]
1 test_run_worker() on global_rank: 4 ,global_c: 4 ,counter_list: [tensor([2.]), tensor([1.]), tensor([2.])]
2 test_run_worker() on global_rank: 4 ,global_c: 6 ,counter_list: [tensor([2.]), tensor([1.]), tensor([3.])]
1 test_run_ps() on global_rank: 1 , global_c= 3 ,counter_list: [tensor([2.]), tensor([1.]), tensor([2.])]
3 test_run_worker() on global_rank: 4 ,global_c: 9 ,counter_list: [tensor([3.]), tensor([2.]), tensor([4.])]
4 test_run_worker() on global_rank: 4 ,global_c: 10 ,counter_list: [tensor([3.]), tensor([2.]), tensor([5.])]
2 test_run_ps() on global_rank: 1 , global_c= 6 ,counter_list: [tensor([3.]), tensor([2.]), tensor([3.])]
3 test_run_ps() on global_rank: 1 , global_c= 9 ,counter_list: [tensor([4.]), tensor([3.]), tensor([4.])]
4 test_run_ps() on global_rank: 1 , global_c= 12 ,counter_list: [tensor([5.]), tensor([4.]), tensor([5.])]
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

Some Codes:

#Master Node:
#GLOO_SOCKET_IFNAME=enp0s31f6 python -m torch.distributed.launch torch_dist_test2.py --local_rank=0
#Slave Node:
#GLOO_SOCKET_IFNAME=enp7s0 python -m torch.distributed.launch torch_dist_test2.py --local_rank=1

def init_dist_multi_process(world_size, global_rank, backend='gloo'):#'nccl'
    dist.init_process_group(backend=backend,
                        init_method='tcp://192.168.1.12:23457',
                        world_size=world_size,
                        rank=global_rank)

def dist_broadcast(src, dic=None, tensor=None, async_op=True, p_device=torch.device('cpu')):
    if not dic == None:
        for key, value in dic.items():
            dist.broadcast(tensor=torch.Tensor(value).to(p_device), src=src, async_op=async_op)
    else:
        dist.broadcast(tensor=tensor.to(p_device), src=src, async_op=async_op)

def test_run_ps(shared_coms):
    init_dist_multi_process(world_size=shared_coms.world_size, global_rank=shared_coms.server_rank, backend='gloo')

    counter_list = [torch.Tensor([0]) for _ in shared_coms.global_worker_rank_flatten_list]
    for _ in range(5):
        time.sleep(0.5)
        for r, gr in enumerate(shared_coms.global_worker_rank_flatten_list):##
            dist_broadcast(src=gr,tensor=counter_list[r])
        global_c = sum([int(x) for x in counter_list])
        print(_,'test_run_ps() on global_rank:',shared_coms.server_rank,', global_c=',global_c,',counter_list:',counter_list)  

    print('test_run_ps() time up')
    time.sleep(5)

def test_run_worker(shared_coms, device_r):
    init_dist_multi_process(world_size=shared_coms.world_size, global_rank=shared_coms.worker_rank_list[device_r], backend='gloo')

    c = 0
    counter_list = [torch.Tensor([0]) for _ in shared_coms.global_worker_rank_flatten_list]

    for _ in range(5):
        time.sleep(0.25*(1+device_r))
        c+=1
        i=0
        for r, gr in enumerate(shared_coms.global_worker_rank_flatten_list):##
            if gr == shared_coms.global_worker_rank_list[shared_coms.server_rank][device_r]:
                counter_list[r] = torch.Tensor([c])
            dist_broadcast(src=gr,tensor=counter_list[r])
        global_c = sum([int(x) for x in counter_list])
        print(_,'test_run_worker() on global_rank:',shared_coms.worker_rank_list[device_r],',global_c:',global_c,',counter_list:',counter_list)
    
    print('test_run_worker() time up')
    time.sleep(5)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int, default=0)
    parser.add_argument("--global_rank_list", type=list, default=[[0,2,3],[1,4]])
    parser.add_argument("--n_worker", type=int, default=16)
    args = parser.parse_args()


    server_rank = args.local_rank
    global_rank_list = args.global_rank_list
    world_size = sum([1 for y in global_rank_list for x in y])
    n_proc = len(global_rank_list[server_rank])-1
    global_n_proc = world_size - len(global_rank_list)
    n_server = len(global_rank_list)
    worker_rank_list = global_rank_list[server_rank][1:]
    global_worker_rank_list = [global_rank_list[x][1:] for x in range(len(global_rank_list))]
    global_worker_rank_flatten_list = []
    for x in global_worker_rank_list: global_worker_rank_flatten_list+=x

    n_workers_per_slave = args.n_worker
    game = 'BreakoutNoFrameskip-v4'
    process_list = []

    shared_coms = SharedComponents(game, server_rank, global_rank_list, p_device=torch.device('cuda'))

    mp.set_start_method('spawn', force=True)

    p = mp.Process(target=test_run_ps, args=(shared_coms, ))#, args=(None))
    process_list.append(p)

    for device_r in range(n_proc):##
        p = mp.Process(target=test_run_worker, args=(shared_coms, device_r))
        process_list.append(p)

    for p in process_list:
        p.start()
        print(p,' is started...')

    for p in process_list:
        p.join()

Hi! Hard to tell where this is going wrong. The warning from multiprocessing happens in the parent process (I think) and doesn’t pinpoint where the crash is happening. It looks like neither process gets to log the time up message, so do they even break out of their loops?

Hi Pieter,

You are right, they do break out of their loops or may be only warning messages after crashes.
Any way to debug? Thank you in advance.

I add logger = multiprocessing.log_to_stderr(), logger.setLevel(multiprocessing.SUBDEBUG) to the demo, getting the following info:

[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
/home/simon/anaconda3/lib/python3.6/multiprocessing/semaphore_tracker.py:146: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[Level 5/MainProcess] calling <Finalize object, dead>
[Level 5/MainProcess] finalizer calling <function rmtree at 0x7fab0b4bfea0> with args ['/tmp/pymp-hfadelk_'] and kwargs {}

Hard to say. I did a quick search and came up with https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming, which might be useful here. It’s a warning that comes from deep in the guts of multiprocessing, so I’d start there.