Shared memory with torch.multiprocessing

Hey folks,

I have a server with large amounts of RAM, but slow storage and I want to speed up training by having my dataset in the RAM. I also use DDP which means there are going to be multiple processes per GPU. On top of that, I use multiple num_workers in my dataloader so having a simple Python list as a caxhe would mean multiple caches which eats up a lot of memory.

The natural solution is to use shared memory. And this is how I use it

  1. In the launch process, do
if __name__ == '__main__':
    import argparse
    import os
    import torch.multiprocessing as mp
    import ctypes

    shared_base = mp.Array(ctypes.c_byte, 80000*3*256*256, lock=True)
    with shared_base.get_lock():
        shared_array = np.ctypeslib.as_array(shared_base.get_obj())
        img_cache = shared_array.reshape(80000, 256, 256, 3)

    use_cache = mp.Array(ctypes.c_float, 1, lock=False)
    use_cache[0] = -3.14
  1. This cache is sent to each process as
mp.spawn(main, nprocs=ngpus_per_node, args=(args, img_cache, use_cache))
  1. Each process takes it this shared memory and gives it to a dataset object
        dset = SVAE_FFHQ(args.data_folder, transform, 32, 64, args.hidden_size, img_cache, use_cache)

  1. The SVAE_FFHQ class does looks like this:
class SVAE_FFHQ(data.Dataset):
    def __init__(self, root_dir, transform=None, top_size=32, bottom_size=64, dim=256, img_cache=None, use_cache=None):
        super().__init__()
        ...
        self.img_cache = img_cache
        self.use_cache = use_cache


    def _use_cache(self):
        self.use_cache[0] = 3.14
        print('Using cache')

    def __getitem__(self, idx):
        path, lbl = self.dset.samples[idx]

        if self.use_cache[0] < 0:
            with open(path, 'rb') as f:
                    img = Image.open(f)
                    img = img.convert('RGB')
                    img = img.resize((256, 256), Image.LANCZOS)
            
            self.img_cache[idx] = deepcopy(np.asarray(img))
            del img
        
        return self.transform(Image.fromarray(self.img_cache[idx], 'RGB'))

This to me seems fine, but what happens is

  1. The shared memory is pickled and not replicated across the multiple spawned processes which means my memory requirments increase with the number of processes spawned.
  2. This isn’t any faster than reading data off of slow HDDs

Any insight into these problems?

Thanks!

1 Like
  1. I cannot replicate your first problem with the below code snippet, memory is not pickled as you can see in the screenshot, only the main process holds the 1GB shared memory array:
import ctypes
import time
import numpy as np
import multiprocessing as mp


def subproc(array):
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())
        print(np_array[1000])

    # keep process showing in "top"
    begin = time.time()
    while time.time() - begin < 10:
        a = 1 * 100


if __name__ == "__main__":
    array = mp.Array(ctypes.c_byte, 1000*1024*1024, lock=True)
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())
        np_array.fill(100)
    print("allocated")
    p = mp.Process(target=subproc, args=(array,))
    p2 = mp.Process(target=subproc, args=(array,))
    p.start()
    p2.start()
    print("started")

    # keep process showing in "top"
    begin = time.time()
    while time.time() - begin < 10:
        a = 1 * 100
    p.join()
    p2.join()
    print("joined")

image

  1. I think the second question might be related to Image.fromarray, see this issue
3 Likes

Thanks for implementing this by yourself.

I must ask a couple of questions though:

  1. What was the version of Python you used? I know that some stuff changed in how Python pickles data since version 3.8.
  2. I use mp.spawn to start the processes, where mp is imported from torch.multiprocessing.

Could that be a problem?

Anyways this looks interesting. Thanks again.

I am using the default python3.5.2 installation from ubuntu16.04

I have added mp.set_start_method("spawn") and still cannot reproduce your issue, it would be better if you can share a minimal problematic code snippet :blush:

Thanks again.

Let me also try your code on my machine. And create a minimal code implementation that I can share here.

This is the code I run

import ctypes
import time
import numpy as np
import torch.multiprocessing as mp

def subproc2(gpu, array):
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())
        print(np_array[1000])
        if gpu == 0:
            np_array[999] = 0
        elif gpu == 1:
            np_array[1000] = 1 

    # keep process showing in "top"
    begin = time.time()
    while time.time() - begin < 10:
        a = 1 * 100

    return 0

if __name__ == "__main__":
    mp.set_start_method('spawn')
    array = mp.Array(ctypes.c_byte, 1000*1024*1024, lock=True)
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())
        np_array.fill(100)

    print("allocated")
    mp.spawn(subproc2, args=(array,), nprocs=2)

    # keep process showing in "top"
    print(np_array[999:10001])
    begin = time.time()
    while time.time() - begin < 100:
        a = 1 * 100
    
    print('done')

The observations are :

  1. Memory is only ever allocated in the main process.
  2. If I don’t set the start method to spawn on Linux, I get a SIGSEGV even on access in subproc2.

Now what I want to test next is if I pass this memory on to dataloaders as is - an np.array, if it will increase memory consumption.

Let me do that now.

import ctypes
import time
import numpy as np
import torch.multiprocessing as mp
import torch
from torch.utils.data import Dataset

class SharedDataset1(Dataset):
    def __init__(self, shared_mem):
        super(SharedDataset1, self).__init__()
        self.shared_mem = shared_mem

    def __len__(self):
        return 10000

    def __getitem__(self, idx):
        return torch.randn(3, 32, 32)

def np_before(gpu, array, num_workers):
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())

    dataset = SharedDataset1(np_array)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=10, num_workers=num_workers)

    for img in dataloader:
        c = img+0.1
        time.sleep(3)

if __name__ == "__main__":
    mp.set_start_method('spawn')
    array = mp.Array(ctypes.c_byte, 10000*1024*1024, lock=True)
    with array.get_lock():
        np_array = np.ctypeslib.as_array(array.get_obj())
        np_array.fill(100)

    print("allocated")
    mp.spawn(np_before, args=(array, 1), nprocs=2)
    print("started")

    begin = time.time()
    while time.time() - begin < 100:
        a = 1 * 100

This gives the following error

Traceback (most recent call last):
File “mem_data.py”, line 64, in
mp.spawn(np_before, args=(array, 1), nprocs=2)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/multiprocessing/spawn.py”, line 200, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=‘spawn’)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/multiprocessing/spawn.py”, line 158, in start_processes
while not context.join():
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/multiprocessing/spawn.py”, line 119, in join
raise Exception(msg)

Traceback (most recent call last):
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/multiprocessing/spawn.py”, line 20, in _wrap
fn(i, *args)
File “/home/parawr/Projects/shared/mem_data.py”, line 40, in np_before
for img in dataloader:
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 279, in iter
return _MultiProcessingDataLoaderIter(self)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/site-packages/torch/utils/data/dataloader.py”, line 719, in init
w.start()
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/process.py”, line 112, in start
self._popen = self._Popen(self)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/context.py”, line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/context.py”, line 284, in _Popen
return Popen(process_obj)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/popen_spawn_posix.py”, line 32, in init
super().init(process_obj)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/popen_fork.py”, line 20, in init
self._launch(process_obj)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/popen_spawn_posix.py”, line 47, in _launch
reduction.dump(process_obj, fp)
File “/home/parawr/.conda/envs/faclab/lib/python3.7/multiprocessing/reduction.py”, line 60, in dump
ForkingPickler(file, protocol).dump(obj)
OverflowError: cannot serialize a bytes object larger than 4 GiB

This thing works

def np_after(gpu, array, num_workers):
    dataset = SharedDataset1(array)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=10, num_workers=num_workers)

    for ii, img in enumerate(dataloader):
        print(ii)
        c = img+0.1
        time.sleep(1)
    return 0

class SharedDataset2(Dataset):
    def __init__(self, shared_mem):
        super(SharedDataset2, self).__init__()
        with shared_mem.get_lock():
            self.shared_mem = np.ctypeslib.as_array(shared_mem.get_obj())

    def __len__(self):
        return 10000

    def __getitem__(self, idx):
        return torch.randn(3, 32, 32)

I think it might have to do with what Python pickles - the array/shared_mem is a SynchronizedArray object which Python knows no to pickle but share, while in the np_before function it is now an np.ndarray which Python tries to pickle.

Let me now try and fix the PIL issue. I will benchmark it to see if it actually speed up data-loading.

I will also have to check validity - I wouldn’t want the workers from the dataloader overwriting each others’ data

The problem source is correct, since the dataset passed to Dataloader will be distributed to workers, and therefore will be pickled. Good work, looking forward to your result. :blush: