Share memory through multiprocessing.Queue is introducing extra memory overhead

context

I want to share tensor between multiple processes, and thus only have 1 copy of tensor in gpu memory.

after reading Reuse buffers passed through a Queue, I thought I can share memory through queue.

I’m running below program on mac:

import pynvml
import torch
import torch.multiprocessing as mp
import logging
import argparse

MiB = 1024 ** 2

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s.%(msecs)03d %(levelname)s - %(funcName)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

_logger = logging.getLogger(__name__)


def get_memory_info_MiB(gpu_index):
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(int(gpu_index))
    mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    return (mem_info.free // MiB, mem_info.used // MiB, mem_info.total // MiB)


def spawn(nprocs, fn):
    ctx = mp.get_context("spawn")
    qu = ctx.Queue()
    barrier = ctx.Barrier(nprocs)

    procs = []
    for ii in range(nprocs):
        p = ctx.Process(target=fn, args=(ii, qu, barrier))
        p.start()
        procs.append(p)

    for p in procs:
        p.join()


def leader__(rank, q, barrier):
    _logger.info(
        "%d before creating tensor, free %s, used %s, total %s MiB",
        rank,
        *get_memory_info_MiB(0),
    )

    tt = torch.zeros(
        (1024 * 1024 * 512, 1),
        dtype=torch.float32,
        device=torch.device("cuda:0"),
    )

    _logger.info(
        "%d after creating tensor, free %s, used %s, total %s MiB",
        rank,
        *get_memory_info_MiB(0),
    )

    # make sure the memory is allocated
    barrier.wait()

    for ii in range(1, barrier.parties):
        q.put(tt)
        _logger.info(
            "put from %d to %d. free %s, used %s, total %s MiB",
            rank,
            ii,
            *get_memory_info_MiB(0),
        )

    barrier.wait()
    for ii in range(barrier.parties):
        assert tt[ii, 0] == ii
    _logger.info("%s mem after barrier %s", rank, get_memory_info_MiB(0))

    del tt
    torch.cuda.empty_cache()
    barrier.wait()
    _logger.info("%s mem after empty_cache %s", rank, get_memory_info_MiB(0))


def follower(rank, q, barrier):
    _logger.info("%s initial mem %s", rank, get_memory_info_MiB(0))

    barrier.wait()
    _logger.info("%d mem before get %s", rank, get_memory_info_MiB(0))

    tt = q.get()
    tt[rank, 0] = rank
    _logger.debug("%d, value: %s, data_ptr: %s", rank, tt[rank, 0], tt.data_ptr())

    _logger.info("%d mem after get %s", rank, get_memory_info_MiB(0))

    barrier.wait()
    _logger.info("%d mem after barrier %s", rank, get_memory_info_MiB(0))

    del tt
    torch.cuda.empty_cache()
    barrier.wait()
    _logger.info("%d mem after empty_cache %s", rank, get_memory_info_MiB(0))


def test_func(rank, q, barrier):
    if rank == 0:
        leader__(rank, q, barrier)
    else:
        follower(rank, q, barrier)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--nprocs", type=int, default=1)
    args = parser.parse_args()
    spawn(args.nprocs, test_func)

experiment

I created 2GB tensor and try to share with 2 other processes. It turns out I end up using 6GB gpu memory. Even if I purged this tensor with 2GB, it still has 4GB left.

Below is the output:

 python mp_mem_info.py -n 3
2022-05-26 00:57:51.063 INFO - follower: 2 initial mem (32480, 0, 32480)
2022-05-26 00:57:51.068 INFO - follower: 1 initial mem (32480, 0, 32480)
2022-05-26 00:57:51.119 INFO - leader__: 0 before creating tensor, free 32480, used 0, total 32480 MiB
2022-05-26 00:57:54.884 INFO - leader__: 0 after creating tensor, free 28975, used 3504, total 32480 MiB
2022-05-26 00:57:54.885 INFO - follower: 1 mem before get (28975, 3504, 32480)
2022-05-26 00:57:54.886 INFO - follower: 2 mem before get (28975, 3504, 32480)
2022-05-26 00:57:54.907 INFO - leader__: put from 0 to 1. free 28975, used 3504, total 32480 MiB
2022-05-26 00:57:54.907 INFO - leader__: put from 0 to 2. free 28975, used 3504, total 32480 MiB
2022-05-26 00:57:58.628 INFO - follower: 1 mem after get (26117, 6362, 32480)
2022-05-26 00:57:58.752 INFO - follower: 2 mem after get (26081, 6398, 32480)
2022-05-26 00:57:58.754 INFO - follower: 2 mem after barrier (26081, 6398, 32480)
2022-05-26 00:57:58.753 INFO - follower: 1 mem after barrier (26081, 6398, 32480)
2022-05-26 00:57:58.757 INFO - leader__: 0 mem after barrier (26083, 6396, 32480)
2022-05-26 00:57:58.765 INFO - follower: 2 mem after empty_cache (28133, 4346, 32480)
2022-05-26 00:57:58.765 INFO - leader__: 0 mem after empty_cache (28133, 4346, 32480)
2022-05-26 00:57:58.766 INFO - follower: 1 mem after empty_cache (28133, 4346, 32480)

mem overhead for each process is 4346 / 3 ~ 1.4GB

If I run single process:

python mp_mem_info.py -n 1
2022-05-26 00:58:51.109 INFO - leader__: 0 before creating tensor, free 32480, used 0, total 32480 MiB
2022-05-26 00:58:54.876 INFO - leader__: 0 after creating tensor, free 28975, used 3504, total 32480 MiB
2022-05-26 00:58:54.879 INFO - leader__: 0 mem after barrier (28973, 3506, 32480)
2022-05-26 00:58:54.882 INFO - leader__: 0 mem after empty_cache (31023, 1456, 32480)

This matches my observation for 3-process case. The extra overhead for each process is around 1.4 GB.

conclusion

It seems this overhead can’t be avoided according to this comment

If I change to multi-threading, I won’t see this extra memory overhead

import pynvml
import torch
import threading as mp
import queue
import logging
import argparse

MiB = 1024 ** 2

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s.%(msecs)03d %(levelname)s - %(funcName)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

_logger = logging.getLogger(__name__)


def get_memory_info_MiB(gpu_index):
    pynvml.nvmlInit()
    handle = pynvml.nvmlDeviceGetHandleByIndex(int(gpu_index))
    mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    return (mem_info.free // MiB, mem_info.used // MiB, mem_info.total // MiB)


def spawn(nprocs, fn):
    qu = queue.SimpleQueue()
    barrier = mp.Barrier(nprocs)

    procs = []
    for ii in range(nprocs):
        p = mp.Thread(target=fn, args=(ii, qu, barrier), daemon=True)
        p.start()
        procs.append(p)

    for p in procs:
        p.join()


def leader(rank, q, barrier):
    _logger.info("%d memory allocated %s", rank, torch.cuda.memory_allocated(0) // MiB)
    tt = torch.zeros(
        (1024 * 1024 * 512, 1),
        dtype=torch.float32,
        device=torch.device("cuda:0"),
    )
    _logger.info("%d memory allocated %s", rank, torch.cuda.memory_allocated(0) // MiB)
    tt.share_memory_()

    # make sure the memory is allocated
    barrier.wait()

    for ii in range(1, barrier.parties):
        q.put(tt)
        _logger.info(
            "put from %d to %d. free %s, used %s, total %s",
            rank,
            ii,
            *get_memory_info_MiB(0),
        )

    barrier.wait()
    for ii in range(barrier.parties):
        assert tt[ii, 0] == ii
    _logger.info("%s free mem after barrier %s", rank, get_memory_info_MiB(0))

    del tt
    torch.cuda.empty_cache()
    barrier.wait()
    _logger.info("%s free mem after empty_cache %s", rank, get_memory_info_MiB(0))


def follower(rank, q, barrier):
    _logger.info("%s free mem %s", rank, get_memory_info_MiB(0))
    barrier.wait()
    _logger.info(
        "%d memory allocated before get %s", rank, torch.cuda.memory_allocated(0)
    )
    _logger.info("%d free mem before get %s", rank, get_memory_info_MiB(0))
    tt = q.get()
    _logger.info(
        "%d memory allocated after get %s", rank, torch.cuda.memory_allocated(0)
    )
    tt[rank, 0] = rank
    _logger.info("%d, value: %s, data_ptr: %s", rank, tt[rank, 0], tt.data_ptr())
    _logger.info("%d free mem after get %s", rank, get_memory_info_MiB(0))

    # decrement the ref-cnt
    # del tt
    barrier.wait()
    _logger.info("%d free mem after barrier %s", rank, get_memory_info_MiB(0))

    del tt
    torch.cuda.empty_cache()
    barrier.wait()
    _logger.info("%d free mem after empty_cache %s", rank, get_memory_info_MiB(0))


def test_func(rank, q, barrier):
    if rank == 0:
        leader(rank, q, barrier)
    else:
        follower(rank, q, barrier)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--nprocs", type=int, default=1)
    args = parser.parse_args()
    spawn(args.nprocs, test_func)

and output is

python threading.py -n 3
2022-05-26 01:14:42.381 INFO - leader: 0 memory allocated 0
2022-05-26 01:14:42.980 INFO - follower: 2 free mem (32456, 23, 32480)
2022-05-26 01:14:42.981 INFO - follower: 1 free mem (32456, 23, 32480)
2022-05-26 01:14:46.256 INFO - leader: 0 memory allocated 2048
2022-05-26 01:14:46.262 INFO - follower: 1 memory allocated before get 2147483648
2022-05-26 01:14:46.263 INFO - follower: 2 memory allocated before get 2147483648
2022-05-26 01:14:46.263 INFO - leader: put from 0 to 1. free 28975, used 3504, total 32480
2022-05-26 01:14:46.264 INFO - follower: 1 free mem before get (28975, 3504, 32480)
2022-05-26 01:14:46.264 INFO - follower: 1 memory allocated after get 2147483648
2022-05-26 01:14:46.265 INFO - follower: 2 free mem before get (28975, 3504, 32480)
2022-05-26 01:14:46.265 INFO - leader: put from 0 to 2. free 28975, used 3504, total 32480
2022-05-26 01:14:46.267 INFO - follower: 1, value: tensor(1., device='cuda:0'), data_ptr: 139629051248640
2022-05-26 01:14:46.267 INFO - follower: 2 memory allocated after get 2147483648
2022-05-26 01:14:46.276 INFO - follower: 2, value: tensor(2., device='cuda:0'), data_ptr: 139629051248640
2022-05-26 01:14:46.276 INFO - follower: 1 free mem after get (28973, 3506, 32480)
2022-05-26 01:14:46.278 INFO - follower: 2 free mem after get (28973, 3506, 32480)
2022-05-26 01:14:46.278 INFO - follower: 2 free mem after barrier (28973, 3506, 32480)
2022-05-26 01:14:46.279 INFO - follower: 1 free mem after barrier (28973, 3506, 32480)
2022-05-26 01:14:46.279 INFO - leader: 0 free mem after barrier (28975, 3504, 32480)
2022-05-26 01:14:46.283 INFO - leader: 0 free mem after empty_cache (31023, 1456, 32480)
2022-05-26 01:14:46.283 INFO - follower: 1 free mem after empty_cache (31023, 1456, 32480)
2022-05-26 01:14:46.284 INFO - follower: 2 free mem after empty_cache (31023, 1456, 32480)

notice, the underlying data_ptr for tensor shared by queue is the same. The overhead for this process is still 1.4 GB.