How to properly clean up shared cuda tensor in case of cuda error?

I’m trying to use pytorch cuda shared tensor to test device p2p capability:

# run with `python test.py`
import json
import os
import sys
import tempfile
import time
from contextlib import contextmanager
from typing import Callable, Dict, List, Optional

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

import vllm.envs as envs
from vllm.logger import init_logger

logger = init_logger(__name__)


@contextmanager
def mute_output():
    with open(os.devnull, "w") as f:
        # sys.stderr = f
        # sys.stdout = f
        yield


def producer(i: int,
             init_method: str,
             consumer_done_event: mp.Event,
             result_quque: mp.Queue,
             cuda_visible_devices: Optional[str] = None,):
    if cuda_visible_devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices
    with mute_output():
        dist.init_process_group(
            backend="gloo",
            init_method=init_method,
            world_size=2,
            rank=0,
        )
        # produce a tensor in GPU i
        data = torch.zeros((128, ), device=f"cuda:{i}")
        # get the information to reconstruct the shared tensor
        func, args = torch.multiprocessing.reductions.reduce_tensor(data)
        args = list(args)
        dist.broadcast_object_list([(func, args)], src=0)
        recv = [None]
        dist.broadcast_object_list(recv, src=1)
        success = recv[0]
        if success:
            dist.barrier()
            assert torch.all(data == 1).item()
        consumer_done_event.wait()
        result_quque.put(success)

def consumer(j: int,
             init_method: str,
             cuda_visible_devices: Optional[str] = None):
    if cuda_visible_devices is not None:
        os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices
    with mute_output():
        dist.init_process_group(
            backend="gloo",
            init_method=init_method,
            world_size=2,
            rank=1,
        )
        torch.cuda.set_device(j)
        recv = [None]
        dist.broadcast_object_list(recv, src=0)
        func: Callable
        args: List
        func, args = recv[0]  # type: ignore
        # `args[6]` is the device id
        # by default pytorch will use `i` from the producer
        # here we need to set it to `j` to test P2P access
        args[6] = j
        success = False
        try:
            data = func(*args)
            data += 1
            success = True
        except Exception:
            pass
        dist.broadcast_object_list([success], src=1)
        if success:
            torch.cuda.synchronize()
            dist.barrier()
            assert torch.all(data == 1).item()
            del data
        del func, args, recv
        import gc
        gc.collect()
        torch.cuda.empty_cache()

def can_actually_p2p(i, j):
    """
    Usually, checking if P2P access is enabled can be done by
    `torch.cuda.can_device_access_peer(i, j)`. However, sometimes
    the driver might be broken, and `torch.cuda.can_device_access_peer(i, j)`
    returns `True` even if P2P access is not actually possible.
    See https://github.com/vllm-project/vllm/issues/2728 and
    https://forums.developer.nvidia.com/t/direct-gpu-gpu-communication-does-not-seem-to-work-properly/283264/10
    Therefore, we have to perform a real P2P access to check if it is actually
    possible.

    Note on p2p and cuda IPC:
    Usually, one process uses one GPU:
    GPU i --> cuda context i --> tensor i --> process i

    We need to combine p2p and cuda IPC, so that:
    GPU i --> cuda context i --> tensor i --> process i
                                 |shared|
    GPU j --> cuda context j --> tensor j --> process j
    That is to say, process i creates a tensor in GPU i, passes IPC handle to
    process j, and process j accesses the tensor in GPU j. Any operation on the
    tensor in process j will be reflected in the tensor in process i, because
    they are the same memory segment.
    It is important to note that process j accesses the tensor in GPU j, not
    GPU i. That's why we need p2p access. # noqa
    """
    cuda_visible_devices = os.getenv('CUDA_VISIBLE_DEVICES', None)
    # pass the CUDA_VISIBLE_DEVICES to the child process
    # to make sure they see the same set of GPUs

    # make sure the temp file is not the same across different calls
    temp_path = tempfile.mktemp() + str(time.time())
    # create an empty file
    with open(temp_path, "w"):
        pass
    init_method = f"file://{temp_path}"

    # make sure the processes are spawned
    smp = mp.get_context("spawn")
    consumer_done_event = smp.Event()
    result_quque = smp.Queue()
    pi = smp.Process(target=producer,
                     args=(i, init_method, consumer_done_event, result_quque, cuda_visible_devices, ))
    pj = smp.Process(target=consumer,
                     args=(j, init_method, cuda_visible_devices, ))
    pi.start()
    pj.start()
    pj.join()
    consumer_done_event.set()
    pi.join()
    return result_quque.get()

if __name__ == '__main__':
    print(can_actually_p2p(0, 1))

It works when two GPUs have p2p capacity, but will produce a warning message if not:

[rank0]:[W CudaIPCTypes.cpp:16] Producer process has been terminated before all shared CUDA tensors released. See Note [Sharing CUDA tensors]

This is strange, though. I already take care of the process termination order.

One hypothesis is, if the consumer errors, and because it is cuda error, the process cannot destruct cuda shared memory.

Any idea on how can I get rid of the warning message? :pray: