`Exception: process 0 terminated with exit code 1` error when using `torch.multiprocessing.spawn` to parallelize over multiple GPUs

I have the following code below using torch.multiprocessing.spawn to parallelize over multiple GPUs:

import numpy as np
import torch
from torch.multiprocessing import Pool, set_start_method, spawn

X = np.array([[1, 3, 2, 3], [2, 3, 5, 6], [1, 2, 3, 4]])
X = torch.DoubleTensor(X)

def X_power_func(j):
    X_power = X.cuda()**j
    return X_power

if __name__ == '__main__':
    results = spawn(X_power_func, range(4), nprocs=1)

results

But I am getting this error below when I run the code:

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-9-97eb990d7396> in <module>()
     12 
     13 if __name__ == '__main__':
---> 14     results = spawn(X_power_func, range(4), nprocs=1)
     15 
     16 results

2 frames
/usr/local/lib/python3.6/dist-packages/torch/multiprocessing/spawn.py in join(self, timeout)
    111                 raise Exception(
    112                     "process %d terminated with exit code %d" %
--> 113                     (error_index, exitcode)
    114                 )
    115 

Exception: process 0 terminated with exit code 1

What I have done wrong in my code?

First, please read API document carefully:

torch.multiprocessing.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method=‘spawn’)

Parameters

  1. fn (function) – …The function is called as fn(i, *args), where i is the process index and args is the passed through tuple of arguments.
  2. args (tuple) – Arguments passed to fn.
  3. nprocs (int) – Number of processes to spawn.

Returns
None if join is True, ProcessContext if join is False

First, you should call your function as:

if __name__ == '__main__':
    # start with 4 processes
    # your original method will invoke your function as X_power_func(0, 0, 1, 2, 3)
    spawn(X_power_func, nprocs=4)

Secondly, do not put result at:

if __name__ == '__main__':
    results = spawn(X_power_func, range(4), nprocs=1)

results

Because in your subprocesses, they will try to access “results”, but since they are not “main”, “results” is not defined,

Thirdly, spawn will not return results.

Returns
None if join is True, ProcessContext if join is False

In summary, please re-read documents, they take a lot of time to write.

1 Like

Many thanks @iffiX for input on this. I am new to torch.multiprocessing.spawn and PyTorch in general, so I guess I just got confused when I read the documentation on it.

  1. With your first point above, are you missing args=()?, ie.
if __name__ == '__main__':
    # start with 4 processes
    # your original method will invoke your function as X_power_func(0, 0, 1, 2, 3)
    spawn(X_power_func, args=(0, 1, 2, 3), nprocs=4)

I don’t understand why must we start with 4 processes?

  1. With your second point, if the results are kept within main, how would I then output the results outside of main?

  2. With your third point, can I ask what is ProcessContext? If I want to return results, does that mean I need to set join=False?

I am sorry for having so many questions. Would really be grateful if you could help.

Well, being new is not an excuse! :rofl:

  1. Yes, I am missing args, because torch.multiprocessing will invoke your function X_power_func(rank), the default argument is the rank of the started process.
  2. If you want to print results outside of main, you main print it in the invoked function:
    def X_power_func(j):
         X_power = X.cuda()**j
         print(X_power)
         return X_power
    
  3. No, in order to properly return results, you should either use torch.multiprocessing.pool or passing a pipe object or anything that can be used to perform inter-process communication.
    Torch multiprocessing module is a very thin wrapper of the original multiprocessing module, it basically just registers some customized serializers.
2 Likes

Or you could post your detailed requirements so that we can workout a proper solution for you.

1 Like

Yeah you’re right @iffiX, I should mention here what I actually want to do.

I am trying to figure out a way to parallelize over multiple GPUs on non-neural net computations in PyTorch. More specifically, I have developed an estimator (non-neural net) in Scikit-learn and the speed performance is slow when a certain hyperparameter is increased. To solve this, I am re-writing the estimator in PyTorch so that I can make use of GPU processing and hopefully multiple GPUs as well.

I have posted a question here in PyTorch Forums (1-2 days ago), Stackoverflow, PyTorch Github and multiple channels in Reddit and I haven’t gotten a reply yet or no one seems to know the full solution. In fact, what I got is the opposite where a lot of folks over at Reddit wanted to know how this is done too. So far, I feel like you’re the only one who seems to know :sweat_smile:

Anyhow, I did initially tried using torch.multiprocessing.pool. The MRE code is as below:

import numpy as np
import torch
from torch.multiprocessing import Pool, set_start_method

X = np.array([[1, 3, 2, 3], [2, 3, 5, 6], [1, 2, 3, 4]])
X = torch.DoubleTensor(X)

def X_power_func(j):
    X_power = X.cuda()**j
    return X_power

if __name__ == '__main__':
  set_start_method('spawn', force=True)
  with Pool(processes = 2) as p:   # Parallelizing over 2 GPUs
    results = p.map(X_power_func, range(4))

However when I run this code, it hangs or keeps running forever without any errors.

When I removed set_start_method('spawn', force=True), the code ran properly and gave me the results, but this only works for when I run the code once. When I ran the code again in subsequent runs, I get the error RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method.

Someone in Reddit suggested that I should use torch.multiprocessing.spawn which lead me here back to PyTorch Forum with this post.

This error is here because:

X = torch.DoubleTensor(X)

def X_power_func(j):
    X_power = X.cuda()**j
    return X_power

You are referencing a global variable here, since “fork” will map the memory of forker to forkee, you can
pass this function to subprocesses correctly, however, “fork” is not compatible with cuda, and therefore the error is thrown.

Could you please show the full code of your estimator? Writing parallel programs in python could be a real pain, with your full code we can choose the most efficient and simplest solution.

1 Like

Ok, the source code for the estimator is here:

It has already been parallelized for multiple CPUs but it’s still slow when the hyperparameter n_copies is increased. This hyperparameter controls the number of times a Kronecker tensor product is performed, which will result in multiplication of very large matrices when it is increased, therefore slowing down the speed performance when this hyperparameter is large.

You can use share_momery_() and torch.multiprocessing.SimpleQueue to implement IPC. E.g.:

import numpy as np
import torch
import torch.multiprocessing as mp


def func(rank, x, p2c, c2p):
    x_power = x.to(rank) ** rank
    c2p.put(x_power)
    # citing multiprocessing doc: Unlike CPU tensors, the 
    # sending process is required to keep the original tensor 
    # as long as the receiving process retains a copy of 
    # the tensor. The refcounting is implemented under the 
    # hood but requires users to follow the next best practices.
    p2c.get()
    print(f"child-{rank} done")

if __name__ == '__main__':
    nprocs = 2
    x = torch.ones(2, 2)
    x.share_memory_()
    ctx = mp.get_context('spawn')
    c2p, p2c = ctx.SimpleQueue(), ctx.SimpleQueue()
    ps = [ctx.Process(target=func, args=(rank, x, p2c, c2p)) for rank in range(nprocs)]
    [p.start() for p in ps]
    tensors = [c2p.get() for _ in range(nprocs)]
    print(tensors)
    del tensors
    for p in ps:
        p2c.put(0)
        p.join()
    print("parent done")
2 Likes

I have read your code:

  1. So the first outerloop(function) is:
    L148:             def X_prime_class_split_func(j):
    
    What’s the shape and data type of X_prime_class_split[j]? Maybe we could represent it as a tensor
  2. from L162-L166:
     for k in range(m_class_split):
           # Encode vectors into quantum densities
           X_prime_class_split_each_row = X_prime_class_split_jth[k, :]
           density_each_row = np.dot(X_prime_class_split_each_row.reshape(-1, 1),
                                                       X_prime_class_split_each_row.reshape(1, -1))
    
    You could definetly vectorize this inner loop.
  3. from L171-L174
    else:
          density_each_row_copy = density_each_row
          for u in range(self.n_copies - 1):
                density_each_row = np.kron(density_each_row, density_each_row_copy)
    
    There is no efficient way to parallelize kronecker product over n_copies since these code are iterative and strongly serial. But you could use the einsum function of pytorch to calculate:
    def kronecker(A, B):
        return torch.einsum("ab,cd->acbd", A, B).view(A.size(0)*B.size(0),  A.size(1)*B.size(1))
    
  4. The scale(computation intensity) of your code does not suit process based parallelism, thread based parallelism is not good in your case as well, I suggest you "vectorize’ your code, use tensor operations insead of for loops, it would be at least 10 times more efficient.
1 Like

Hi @iffiX,

What’s the shape and data type of X_prime_class_split[j] ?
X_prime_class_split[j] is a 2d numpy array. How I have CPU parallelize my code is two-fold. First, it parallelizes over the 2 binary classes. Secondly, for each binary class, the dataset is split into batches and parallelization is performed over the batches. X_prime_class_split[j] is just a dataset batch, therefore it is a 2d numpy array.

Maybe we could represent it as a tensor.
I have actually already converted all of the numpy functions in my code into PyTorch functions. It is here in my Github. Of course, this code is not fully working properly yet because of the issue with torch.multiprocessing.Pool. So in this code, X_prime_class_split[j] is a PyTorch tensor.

You could definetly vectorize this inner loop.
Thanks for the tip! I didn’t realize chunk of codes could also be vectorized (using numpy.vectorize). I googled around but couldn’t find the PyTorch equivalent of numpy.vectorize. If I were to vectorize this part of the code, do you know how I could do it using PyTorch tensors?

There is no efficient way to parallelize kronecker product over n_copies since these code are iterative and strongly serial.
Agree, there is no way to efficient parallelize kronecker product because of it’s iterative and serial nature. As mentioned above, my code is actually parallelized over the 2 binary classes and batches of the dataset, so I am not looking to parallelize the kronecker product.

But you could use the einsum function of pytorch to calculate…
I have actually already done this in my Github link above! This gives me some comfort knowing that I am on the same page as you :smile:

I guess if I can’t parallelize my code over multiple GPUs, plan B would be to rewrite my code to just use 1 GPU processing.

Many many thanks again for having a look @iffiX. Really appreciate it heaps!

Many thanks @mrshenli.

Your code looks interesting and I am new to PyTorch, so I will have to investigate it line by line what is it doing to see if it helps in what I want to do.

Hello, I am optimizing your code, what’s the usual size of n_samples and n_features?

If n_samples * n_features < 1e8, and you have a medium to good GPU (>=GTX1080), then there is no need to split by class, below is the “vectorized” implementation, not tested.

# new implementation of kronecker
def kronecker(A, B):
    return torch.einsum('nab,ncd->nacbd', A, B)\
        .view(A.size(0), A.size(1)*B.size(1), A.size(2)*B.size(2))

Main code, no need to use pools or whatever:

# according to your code, the shape of `X_prime` should be (n_samples, n_features + 1)
# whether encoding = "amplit" or "stereo"
rows = n_samples
cols = n_features + 1


# you may keep this if "n_samples * n_features" > 1e8
# X_prime_class = X_prime[y_class_index == i]

# Number of rows/columns in density matrix
density_nrow_ncol = (n + 1)**self.n_copies

# Encode vectors into quantum densities
# density: [rows, cols, cols], each density[k] is the original `density_each_row`
density = torch.matmul(X_prime.view(rows, cols, 1),
                       X_prime.view(rows, 1, cols))

# Calculate n-fold Kronecker tensor product
if self.n_copies == 1:
    density = density
else:
    density_copy = density
    for u in range(self.n_copies - 1):
        density = kronecker(density, density_copy)

# Calculate sum of quantum densities belonging to either class, per subset split
density_sum = density.sum(dim = 0)

# calculate centroid and q_hels_obs_terms
centroid = (1 / (m_class + 1e-6))*density_sum_class

if self.class_wgt == 'equi':
    q_hels_obs_terms = 0.5*centroid_class
elif self.class_wgt == 'weighted':
    q_hels_obs_terms = (m_class / m)*centroid_class
else:
    raise ValueError('class_wgt should be "equi" or "weighted"')
1 Like

Hi @iffiX,

The usual size of n_samples and n_features can be anything really, since I have written a classifier that can be used for any general datasets. But I will take note of your rule of thumb about if “n_samples * n_features < 1e8, then no need to split by class”.

Yes you are correct, X_prime is always (n_samples, n_features + 1) regardless of encoding.

I see what you mean now by “vectorize”. First, `torch.matmul()’ itself has the “vectorization” feature where it can multiple vectors inside a matrix in a “vectorization” manner. Second, rather than just using 2d tensors, I can use a 3d (or higher) dimension tensors to incorporate the 2 classes into one tensor object and then vectors inside this one higher dimension tensor object can be “vectorized”.

Questions:

  1. Can I ask how did you determine the value 1e8 in n_samples * n_features < 1e8?
  2. Can I ask why is there a need to add 1e-6 in (m_class + 1e-6)?
  3. So for my code, it is difficult to parallelize over multiple GPUs at all? It would be nice if at most I could perhaps just parallelize over the 2 classes on 2 GPUs.
1 Like
  1. a general estimation from experience, its very rough, for your algorithm it could be anywhere between 1e6 to 1e9 depending on the platform and n_features.
  2. prevent zero devision, if denominator > 0
  3. For multiple gpus, split by class, coarser ganularity splits fits gpus better.
1 Like

Sorry what did you mean by “coarser ganularity splits fits gpus better”?

Simply: give larger chunks of data to GPU, they are beasts.

1 Like

Ok many thanks once again @iffiX

With parallelizing over the 2 GPUs, I am thinking I guess I can use this:

device = torch.device("cuda:0")
X1 = X1.to(device)
device = torch.device("cuda:1")
X2 = X2.to(device)

Hi @iffiX, really sorry to bother you but would be great if I could get your input on one more question.

If I were to follow the code that you had suggested, where every row vector (of the dataset, or more specifically of X_prime ) is converted to a matrix (ie. a density) and then these matrices are all put into the one tensor object, I think this will run into memory blow-out issues. For eg., if my dataset has say 200,000 rows, then I will have 200,000 matrices that needs to be stored at one time, and this could cause a memory blow-out.

In comparison, the code that I have calculates the running sum of the matrices/densities at every step when a new row vector is converted to a matrix, therefore I do not need to store all the matrices in one tensor object (and then only sum them all up after).

Just wanted to get your thoughts on this. It’s ok too if you’re unsure.

Many thanks once again.