ProcessExitedException: process 0 terminated with exit code 1

The basic example i am trying to run:

“”"
Based on: Getting Started with Distributed Data Parallel — PyTorch Tutorials 2.1.1+cu121 documentation

Correctness of code: machine learning - How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch? - Stack Overflow

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
“”"
import time

from typing import Tuple

import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(idata_x, idata_y) for i in range(num_epochs)]

class PerDeviceModel(nn.Module):
“”"
Toy example for a model ran in parallel but not distributed accross gpus
(only processes with their own gpu or hardware)
“”"
def init(self):
super().init()
self.net1 = nn.Linear(Din, Din)
self.relu = nn.ReLU()
self.net2 = nn.Linear(Din, Dout)

def forward(self, x):
    return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend=‘nccl’):
“”"
Initialize the distributed environment (for each process).

gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
"""
# set up the master's ip address so this child process can coordinate
# os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'

# - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
# if torch.cuda.is_available():
#     backend = 'nccl'
# Initializes the default distributed process group, and this will also initialize the distributed package.
dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
“”" Destroy a given process group, and deinitialize the distributed package “”"
dist.destroy_process_group()

def get_batch(batch: Tuple[torch.Tensor, torch.Tensor], rank):
x, y = batch
if torch.cuda.is_available():
x, y = x.to(rank), y.to(rank)
else:
x, y = x.share_memory_(), y.share_memory_()
return x, y

def get_ddp_model(model: nn.Module, rank):
“”"
Moves the underlying storage to shared memory.

    This is a no-op if the underlying storage is already in shared memory
    and for CUDA tensors. Tensors in shared memory cannot be resized.

:return:

TODO: does this have to be done outside or inside the process? my guess is that it doesn't matter because
1) if its on gpu once it's on the right proc it moves it to cpu with id rank via mdl.to(rank)
2) if it's on cpu then mdl.share_memory() or data.share_memory() is a no op if it's already in shared memory o.w.
"""
# if gpu avail do the standard of creating a model and moving the model to the GPU with id rank
if torch.cuda.is_available():
# create model and move it to GPU with id rank
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
else:
# if we want multiple cpu just make sure the model is shared properly accross the cpus with shared_memory()
# note that op is a no op if it's already in shared_memory
    model = model.share_memory()
    ddp_model = DDP(model)  # I think removing the devices ids should be fine...?
return ddp_model
# return OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

def run_parallel_training_loop(rank, world_size):
“”"
Distributed function to be implemented later.

This is the function that is actually ran in each distributed process.

Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
you don’t need to worry about different DDP processes start from different model parameter initial values.
"""
setup_process(rank, world_size)
print()
print(f"Start running DDP with model parallel example on rank: {rank}.")
print(f'current process: {mp.current_process()}')
print(f'pid: {os.getpid()}')

# get ddp model
model = PerDeviceModel()
ddp_model = get_ddp_model(model, rank)

# do training
for batch_idx, batch in enumerate(data):
    x, y = get_batch(batch, rank)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(x)
    # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
    loss_fn(outputs, y).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
    optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

print()
print(f"End running DDP with model parallel example on rank: {rank}.")
print(f'End current process: {mp.current_process()}')
print(f'End pid: {os.getpid()}')
# Destroy a given process group, and deinitialize the distributed package
cleanup()

def main():
print()
print(‘running main()’)
print(f’current process: {mp.current_process()}‘)
print(f’pid: {os.getpid()}’)
# args
if torch.cuda.is_available():
world_size = torch.cuda.device_count()
else:
world_size = mp.cpu_count()

print(f'world_size={world_size}')
mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if name == “main”:
print(‘starting main’)
start = time.time()
main()
print(f’execution length = {time.time() - start}')
print(‘Done!\a\n’)

The error i get:

I tried reinstalling all nvidia-drivers, but the issue persists


ProcessExitedException Traceback (most recent call last)
in
151 print(‘starting main’)
152 start = time.time()
→ 153 main()
154 print(f’execution length = {time.time() - start}')
155 print(‘Done!\a\n’)

in main()
146
147 print(f’world_size={world_size}')
→ 148 mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)
149
150 if name == “main”:

~/anaconda3/envs/leaf/lib/python3.8/site-packages/torch/multiprocessing/spawn.py in spawn(fn, args, nprocs, join, daemon, start_method)
228 ’ torch.multiprocessing.start_process(…)’ % start_method)
229 warnings.warn(msg)
→ 230 return start_processes(fn, args, nprocs, join, daemon, start_method=‘spawn’)

~/anaconda3/envs/leaf/lib/python3.8/site-packages/torch/multiprocessing/spawn.py in start_processes(fn, args, nprocs, join, daemon, start_method)
186
187 # Loop on join until it returns True or raises an exception.
→ 188 while not context.join():
189 pass
190

~/anaconda3/envs/leaf/lib/python3.8/site-packages/torch/multiprocessing/spawn.py in join(self, timeout)
137 )
138 else:
→ 139 raise ProcessExitedException(
140 “process %d terminated with exit code %d” %
141 (error_index, exitcode),

ProcessExitedException: process 0 terminated with exit code 1

Post copy/pasted from another thread (let us know if it works for you, it did for me).

It took me quite some time to figure it out. Hopefully this is useful for someone.
In my case I am using Jupyter Notebook which was causing the exact same error. I wrote a small blog post about it with code examples to a fully working PyTorch Multiprocessing with Queue.
https://dataiskey.eu/jupyter-notebook-pytorch-multiprocessing/
Github with just the code example:
GitHub - olympus999/jupyter-notebook-pytorch-multiprocessing-queue: A simple workaround to run Pytorch multiprocessing in Jupyter Notebook. Also showing performance difference between normal Queue (not sharing memory) and Pytorch queue (sharing memory).

Hi @ptrblck , I tried this code, but it gave me the error , I search ed the error but no specific solution came to my mind.

from torch.utils.data import DataLoader
from transformers import TextDataset,DataCollatorForLanguageModeling
#from transformers import  AutoModelWithLMHead
from transformers import  AutoModelForCausalLM
from transformers import AdamW, get_linear_schedule_with_warmup
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
import random
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm, trange
import gc
import math
import os
import time
import datetime
import torch
import torch.distributed as dist
import sys
## the directory include the package from INVIDIA
#sys.path.append('/home/momenisa/GPU_ZIP_Apex/apex-master/apex/')
#from apex import amp
#from apex.parallel import DistributedDataParallel as DDP
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
import random
######################
PathData='/home//NLP_Projects/CaseSummary_resolutionProject/Results_GPT_2/model_v4200_k_bs=16_lr=5e-05_epochs=20/'
pretrained_model='/home///GPT_2/'

weight_decay=0
learning_rate=5e-5
adam_epsilon=1e-8
warmup_steps = 1e2
lr=5e-5
Max_length=400

################################################
class GPT2Dataset(Dataset):

    def __init__(self, txt_list, tokenizer, gpt2_type=pretrained_model, max_length=400):

        self.tokenizer = tokenizer
        self.input_ids = []
        self.attn_masks = []

        for txt in txt_list:

            encodings_dict = tokenizer('<|startoftext|>'+ txt + '<|endoftext|>', truncation=True, max_length=max_length, padding="max_length")

            self.input_ids.append(torch.tensor(encodings_dict['input_ids']))
            self.attn_masks.append(torch.tensor(encodings_dict['attention_mask']))

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.attn_masks[idx] 

######################################################3
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
#########################################################

def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
        
    print(PathData)
    
    print(rank)

    ### defined variable ###############
    seed_val = 42
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)

    ###############################
    
    ddp_setup(rank, world_size)
    
    ###############################
    
    tokenizer = GPT2Tokenizer.from_pretrained(pretrained_model, bos_token='<|startoftext|>', eos_token='<|endoftext|>', pad_token='<|pad|>') #gpt2-small

    model = GPT2LMHeadModel.from_pretrained(pretrained_model)

    model.resize_token_embeddings(len(tokenizer))

    ## loading traina and tets dataset
    print(PathData)
    trains_titles=pd.read_csv(PathData+'/'+'traindata.csv')
    valid_titles=pd.read_csv(PathData+'/'+'validdata.csv')

    train_dataset = GPT2Dataset(trains_titles, tokenizer, max_length=Max_length)

    Val_dataset = GPT2Dataset(valid_titles, tokenizer, max_length=Max_length)

    ################# define optimizer and scheduler#########################

    # Note: AdamW is a class from the huggingface library (as opposed to pytorch) 
    optimizer = AdamW(model.parameters(), lr = learning_rate,eps = adam_epsilon)

    total_steps = len(train_dataloader) * epochs

    # Create the learning rate scheduler.
    # This changes the learning rate as the training loop progresses
    scheduler = get_linear_schedule_with_warmup(optimizer, 
                                                num_warmup_steps = warmup_steps, 
                                                num_training_steps = total_steps)

    ############################## train_loader and validation_loader ######################3
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(train_dataset))


    validation_loader= torch.utils.data.DataLoader(dataset=Val_dataset,
                                                       batch_size=batch_size,
                                                        pin_memory=True,
                                                        shuffle=False,
                                                       sampler=DistributedSampler(Val_dataset))
   
    training_steps_per_epoch=len(train_loader)
    total_num_training_steps = int(training_steps_per_epoch*total_epochs)

  ######################## applying DDP on the model for training ############################
    model=model.to(gpu_id)
    model = DDP(model, device_ids=[gpu_id])
    # ========================================
    #               Training
    # ========================================

    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')
        
    training_stats = []


    for epoch_i in range(0, total_epochs):

        ##########################################
        train_loader.sampler.set_epoch(epochs)
        b_sz = len(next(iter(train_loader))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        train_loader.sampler.set_epoch(epoch)
        ##########################################

        t0 = time.time()

        total_train_loss = 0

        model.train()

        for step, batch in enumerate(train_loader):

            #################################
            b_input_ids = batch[0].to(gpu_id)
            b_labels = batch[0].to(gpu_id)
            b_masks = batch[1].to(gpu_id)
            #################################

            optimizer.zero_grad()        

            outputs = model(  b_input_ids,
                             labels=b_labels, 
                              attention_mask = b_masks,
                              token_type_ids=None
                            )

            loss = outputs[0]  
            batch_loss = loss.item()
            total_train_loss += batch_loss
            loss.backward()
            optimizer.step()
            scheduler.step()

        # Calculate the average loss over all of the batches.
        avg_train_loss = total_train_loss / len(train_dataloader)  

        del total_train_loss
        del batch_loss
        
        # Measure how long this epoch took.
        training_time = format_time(time.time() - t0)
        # ========================================
        #               Validation
        # ========================================

        print("")
        print("Running Validation...")

        avg_val_loss_1=[]
        t0 = time.time()
        #################### is this section corrcet for validation ???????????????????? #############
        model.eval()
        model = DDP(model, device_ids=[gpu_id])
        ########################################3
        total_eval_loss = 0
        nb_eval_steps = 0
        
        ########################################
        validation_loader.sampler.set_epoch(epochs)
        b_sz = len(next(iter(validation_loader))[0])
        print(f"[GPU{gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(validation_loader)}")
        validation_loader.sampler.set_epoch(epoch)
        ###########################################
        
        # Evaluate data for one epoch
        for batch in validation_loader:

            b_input_ids = batch[0].to(gpu_id)
            b_labels = batch[0].to(gpu_id)
            b_masks = batch[1].to(gpu_id)

            with torch.no_grad():        
                outputs  = model(b_input_ids,attention_mask = b_masks,labels=b_labels)
                loss = outputs[0]  
            batch_loss = loss.item()
            total_eval_loss += batch_loss        

        avg_val_loss = total_eval_loss / len(validation_dataloader)

        perplexity=math.exp(avg_val_loss)

        avg_val_loss_1.append(avg_val_loss)

        validation_time = format_time(time.time() - t0)    

        del total_eval_loss 


        print("  Validation Loss: {0:.2f}".format(avg_val_loss))
        print("  Validation took: {:}".format(validation_time))

        # Record all statistics from this epoch.
        training_stats.append(
            {
                'epoch': epoch_i + 1,
                'Training Loss': avg_train_loss,
                'Valid. Loss': avg_val_loss,
                'Training Time': training_time,
                'Validation Time': validation_time,
                'perplexity': perplexity
            }
        )
        gc.collect()
        
        ################### saving the model ########################

        if gpu_id == 0 and epoch_i % save_every == 0:

            Pathmodel=Results_Path+'/'+'savemodel_epoch='+str(epoch_i)

            ss=os.path.isdir(Pathmodel)
            if ss==False:
                os.makedirs(Pathmodel)

            ckp = model.module.state_dict()
            torch.save(ckp, Pathmodel+'/ "checkpoint.pt"')
    ############ save the results #####################3          
    Path_2=pt_save_directory+'/'+'training_stats='+str(0)+".csv"
    torch.save(training_stats,Path_2)

    #### is a good place to put the destrop process ??????????????????? ###########
    destroy_process_group()
    #############################
if __name__ == '__main__':
    import sys
    total_epochs=21
    save_every=5
    batch_size=32
    world_size = torch.cuda.device_count()
    print(world_size)
    mp.spawn(main, args=(world_size, save_every, total_epochs, batch_size), nprocs=world_size,join=True)

The stacktrace doesn’t show the actual error message so I don’t know what exactly is failing. Try to execute your script in the terminal directly instead of inside a notebook to see if a better error message would be raised.