Memory leak when using RPC for pipeline parallelism

Hi, when I use torch.distributed.rpc to implement pipeline parallelism for transformer-based inference, the memory consumption increases with each forward pass. The code for 2 nodes is like this,

First, I define two classes for transformer shard

import os
import sys
import threading
import time
import torch
import torch.nn as nn
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from transformers import ViTFeatureExtractor, ViTForImageClassification
#####################################
#               Define Transformer Shard             #
####################################


class TransformerShard1(nn.Module):
    def __init__(self, device, config,num_layers):
        super().__init__()
        # self.config = config
        self.model = ViTForImageClassification.from_pretrained(config)
        self.model.vit.encoder.layer = nn.Sequential(*[self.model.vit.encoder.layer[i] for i in range(num_layers)])
        self.model.vit= nn.Sequential(*list(self.model.vit.children())[:-1])
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self._lock = threading.Lock()
        self.device = device

    def forward_kernel(self, x):
        x = self.model(x).to_tuple()[0]
        end = time.time()
        return x

    @torch.no_grad()
    def forward(self, pixel_values=None,
        head_mask=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None):
        x = pixel_values.to_here().to(self.device)
        with self._lock:
            x = self.forward_kernel(x)
        return x.cpu()

class TransformerShard2(nn.Module):
    def __init__(self,device, config, num_layers):
        super().__init__()
        self.model = ViTForImageClassification.from_pretrained(config)
        self.model.vit.encoder.layer = nn.Sequential(*[self.model.vit.encoder.layer[i] for i in range(num_layers, 2*num_layers)])
        self.model.vit= nn.Sequential(*list(self.model.vit.children())[1:-1])
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self._lock = threading.Lock()
        self.device = device

    def forward_kernel(self, x):
        x = self.model(x)[0]
        return x

    @torch.no_grad()
    def forward(self, x_rref=None,
        head_mask=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            x = self.forward_kernel(x)
        return x.cpu()

Then I stitch them into one class for forwarding:

class DistViT(nn.Module):
    def __init__(
        self, 
        split_size,
        workers,
        config,
        num_layers,
        *args, **kwargs
    ):
        super().__init__()


        self.split_size = split_size   # for microbatch
        self.num_layers = num_layers 
        self.p1_rref = rpc.remote(
            workers[0],
            TransformerShard1,
            args = ("cpu", config, int(num_layers/2)) + args,
            kwargs = kwargs
        )

        self.p2_rref = rpc.remote(
            workers[1],
            TransformerShard2,
            args = ("cpu", config, int(num_layers/2)) + args,
            kwargs = kwargs
        )
   

    def forward(self, pixel_values=None,
        head_mask=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None):
        out_futures = []
        start = time.time()
        id = 0
        for x in iter(pixel_values.split(self.split_size, dim=0)):
            x_rref = RRef(x)
            y_rref = self.p1_rref.remote().forward(x_rref)
            z_fut = self.p2_rref.rpc_async().forward(y_rref) 
            out_futures.append(z_fut)
        torch.futures.wait_all(out_futures)
        return out_futures

    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params

Then run RPC process like this:

######################################
#                   Run RPC Processes                   #
######################################
config = 'google/vit-base-patch16-224'
num_layers = 12

num_batches = 1
batch_size = 256

img = torch.randn(3, 384, 384)
imgs = [img for i in range(batch_size)]

feature_extractor = ViTFeatureExtractor.from_pretrained(config)
def run_master(split_size):
    # put the two model parts on worker1 and worker2 respectively
    print("Run mastering \n")
    for si in range(len(split_size)):
        model = DistViT(split_size[si], ["worker0", "worker1"], config, num_layers)
        inputs = feature_extractor(images=imgs, return_tensors="pt")
        for i in range(num_batches):
            # generate random inputs and labels       
            outputs = model(**inputs)


def run_worker(rank, world_size, num_split):
     # run on local host
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29501'

    # Higher timeout is added to accommodate for kernel compilation time in case of ROCm.
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256,rpc_timeout=3000)

    if rank == 0:
        rpc.init_rpc(
            "worker0",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master(num_split)
    else:
        rpc.init_rpc(
            "worker1",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass

    rpc.shutdown()

The main function:

if __name__=="__main__":
    world_size = 2
    rank=int(sys.argv[1])
    num_split=[8]

    print(f"{config}, {num_layers}, {num_split}")
    
    tik = time.time()
    run_worker(rank, world_size, num_split)
    tok = time.time()
    print(f"Total program execution time = {tok - tik}")

It needs to import transformers package with the command:

pip install transformers

The whole script is uploaded in ubuntu pastebin

I run it on my macOS with PyTorch 1.8.1 CPU-only, the running command is like this:
on the first terminal:

python pipeline_parallelism.py 0

on the second terminal:

python pipeline_parallelism.py 1

I use top command to check the memory usage, and after each forward, the memory increases about 3MB until OOM or the RPC shutdown. Could anyone help me to fix or find the problem? Thank you very much!

This looks interesting!

I think firs you need to limit the debugging scope. For example, if you change the microbatch size to 1, will it still cause memory leak? If so, then the issue at least is not microbatching in pipeline parallelism.

On the other hand, have you tried PyTorch native pipeline parallelism API here?

Hi Yi Wang,

Thanks for your reply! When I limit the microbatch size to 1 and change the num_batches to 10, the memory leak problem exists. I will continue to debug this, any suggestions?

Thank you for recommending this API But PyTorch native pipeline parallelism API seems not to work for multiple distributed machines, right? Currently we want to build pipeline parallelism for multiple nodes.

Best,
Yang

I will continue to debug this, any suggestions?

Then the bug is unlikely to be relevant to the pipelining logic. One thing I would check is all the RRef objects. Have you used any of the RRef objects or its field temporarily? If any, you can try to delete it after the usage. For example, you probably can try del pixel_values after the line x = pixel_values.to_here().to(self.device).

Hi Yi Wang,

Thanks for your reply. I edit the code according to your suggestions like this (del pixel_values, del x_rref):

class TransformerShard1(nn.Module):
    def __init__(self, device, config,num_layers):
        super().__init__()
        # self.config = config
        self.model = ViTForImageClassification.from_pretrained(config)
        self.model.vit.encoder.layer = nn.Sequential(*[self.model.vit.encoder.layer[i] for i in range(num_layers)])
        self.model.vit= nn.Sequential(*list(self.model.vit.children())[:-1])
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self._lock = threading.Lock()
        self.device = device

    def forward_kernel(self, x):
        x = self.model(x).to_tuple()[0]
        end = time.time()
        return x

    @torch.no_grad()
    def forward(self, pixel_values=None,
        head_mask=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None):
        x = pixel_values.to_here().to(self.device)
        del pixel_values
        gc.collect()
        with self._lock:
            x = self.forward_kernel(x)
        print("Shard1 finish its microbatch ")
        return x.cpu()

class TransformerShard2(nn.Module):
    def __init__(self,device, config, num_layers):
        super().__init__()
        self.model = ViTForImageClassification.from_pretrained(config)
        self.model.vit.encoder.layer = nn.Sequential(*[self.model.vit.encoder.layer[i] for i in range(num_layers, 2*num_layers)])
        self.model.vit= nn.Sequential(*list(self.model.vit.children())[1:-1])
        self.model = nn.Sequential(*list(self.model.children())[:-1])
        self._lock = threading.Lock()
        self.device = device

    def forward_kernel(self, x):
        x = self.model(x)[0]
        return x

    @torch.no_grad()
    def forward(self, x_rref=None,
        head_mask=None,
        labels=None,
        output_attentions=None,
        output_hidden_states=None,
        return_dict=None):
        x = x_rref.to_here().to(self.device)
        del x_rref
        gc.collect()
        with self._lock:
            x = self.forward_kernel(x)
        print("Shard1 finish its microbatch ")
        return x.cpu()

The overall memory usage is reduced (for the microbatch is 256, the peak memory usage is reduced from ~800MB to ~500MB). However, after each forward pass, the memory still keeps growing. It even does not be reduced after finishing one batch, which is very strange.

I only provided one example, and it’s expected that my suggestions is not exhaustive.

Memory leak is a really tricky issue. You can try reverting some code and checking when the memory leak appears step by step.

On the other hand, check out this question: How to debug causes of GPU memory leaks? - #3 by smth

Hi Yi Wang,

Thanks for your reply and this useful link! I found this memory leak problem is related to num_worker_threads=256 in options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256). According to your suggestion and the link, I first add:

for obj in gc.get_objects():
            try:
                if torch.is_tensor(obj) or (hasattr(obj, 'data') and torch.is_tensor(obj.data)):
                    print(f"After forward: {type(obj)}, {obj.size()}")
            except: pass

to trace the tensor and add del ... to make sure there are the same amount of tensors between different forward passes. But the memory still keeps growing.

However, when I reduce the num_worker_threads=256 in options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256) to num_worker_threads=2, the memory stops growing. Further, the memory shows significant growing only when I set num_worker_threads > 6.

I could only find little information about this argument in the PyTorch document:

  • num_worker_threads (int, optional) – The number of threads in the thread-pool used by TensorPipeAgent to execute requests (default: 16).

And the tutorial for pipeline parallelism set it to 128, which I thought was related to the communication/request speed. I am not familiar with it, and wonder:

  1. Why does larger num_worker_threads cause the memory leak problem?
  2. When do we need a larger num_worker_threads?

Do you have any related experience or suggestions? I need your help! Thank you very much!

Best,
Yang

  1. Why does larger num_worker_threads cause the memory leak problem?
  2. When do we need a larger num_worker_threads?

Good question! cc: @lcw (TensorPipe expert).

On the other hand, can you try ProcessGroupRpcBackendOptions to see if there is memory leak? This backend will be deprecated soon, as it is much slower on GPUs, but you should still be able to use it now.

Example code:

rpc.init_rpc(
    "worker1",
     rank=0,
     world_size=2,
     backend=rpc.BackendType.PROCESS_GROUP,
     rpc_backend_options=rpc.ProcessGroupRpcBackendOptions(
         num_send_recv_threads=16,
         rpc_timeout=20 # 20 second timeout
     )
)

Hi Yi Wang,

When I change to ProcessGroupRpcBackendOptions and set num_send_recv_threads=16, the memory also keeps growing with each forward pass. I observe it likes this: the memory first is reduced ~ 1M after one forward pass and then be increased >3M after several passes, which is similar to the TensorPipe backend.

In addition, the tutorial with mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True) and options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300) do not cause memory leak problem on single machine. However, if I change it for 2 nodes like the following, it shows memory leak problem (the modified lines are marked with comment # original:):

import os
import sys
import threading
import time
from functools import wraps

import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RRef

from torchvision.models.resnet import Bottleneck


#########################################################
#           Define Model Parallel ResNet50              #
#########################################################

# In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.


num_classes = 1000


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class ResNetBase(nn.Module):
    def __init__(self, block, inplanes, num_classes=1000,
                 groups=1, width_per_group=64, norm_layer=None):
        super(ResNetBase, self).__init__()

        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group

    def _make_layer(self, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * self._block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * self._block.expansion, stride),
                norm_layer(planes * self._block.expansion),
            )

        layers = []
        layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
                                  self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * self._block.expansion
        for _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                      base_width=self.base_width, dilation=self.dilation,
                                      norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def parameter_rrefs(self):
        r"""
        Create one RRef for each parameter in the given local module, and return a
        list of RRefs.
        """
        return [RRef(p) for p in self.parameters()]


class ResNetShard1(ResNetBase):
    """
    The first part of ResNet.
    """
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard1, self).__init__(
            Bottleneck, 64, num_classes=num_classes, *args, **kwargs)

        self.device = device
        self.seq = nn.Sequential(
            nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
            self._norm_layer(self.inplanes),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            self._make_layer(64, 3),
            self._make_layer(128, 4, stride=2)
        ).to(self.device)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out =  self.seq(x)
            print("Shard1 finish 1 microbatch")
        return out.cpu()


class ResNetShard2(ResNetBase):
    """
    The second part of ResNet.
    """
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)

        self.device = device
        self.seq = nn.Sequential(
            self._make_layer(256, 6, stride=2),
            self._make_layer(512, 3, stride=2),
            nn.AdaptiveAvgPool2d((1, 1)),
        ).to(self.device)

        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out = self.fc(torch.flatten(self.seq(x), 1))
            print("Shard2 finish 1 microbatch")
        return out.cpu()


class DistResNet50(nn.Module):
    """
    Assemble two parts as an nn.Module and define pipelining logic
    """
    def __init__(self, split_size, workers, *args, **kwargs):
        super(DistResNet50, self).__init__()

        self.split_size = split_size

        # Put the first part of the ResNet50 on workers[0]
        self.p1_rref = rpc.remote(
            workers[0],
            ResNetShard1,
            # original: args = ("cuda:0",) + args,
            args = ("cpu",) + args,
            kwargs = kwargs
        )

        # Put the second part of the ResNet50 on workers[1]
        self.p2_rref = rpc.remote(
            workers[1],
            ResNetShard2,
            # original: args = ("cuda:1",) + args,
            args = ("cpu",) + args,
            kwargs = kwargs
        )

    def forward(self, xs):
        # Split the input batch xs into micro-batches, and collect async RPC
        # futures into a list
        out_futures = []
        for x in iter(xs.split(self.split_size, dim=0)):
            x_rref = RRef(x)
            y_rref = self.p1_rref.remote().forward(x_rref)
            z_fut = self.p2_rref.rpc_async().forward(y_rref)
            out_futures.append(z_fut)

        # collect and cat all output tensors into one tensor.
        return torch.cat(torch.futures.wait_all(out_futures))

    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params


#########################################################
#                   Run RPC Processes                   #
#########################################################

#original: num_batches = 3
num_batches = 10
batch_size = 120
image_w = 128
image_h = 128


def run_master(split_size):

    # put the two model parts on worker1 and worker2 respectively
    model = DistResNet50(split_size, ["worker0", "worker1"])
    loss_fn = nn.MSELoss()
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    one_hot_indices = torch.LongTensor(batch_size) \
                           .random_(0, num_classes) \
                           .view(batch_size, 1)

    for i in range(num_batches):
        print(f"Processing batch {i}")
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                      .scatter_(1, one_hot_indices, 1)

        # The distributed autograd context is the dedicated scope for the
        # distributed backward pass to store gradients, which can later be
        # retrieved using the context_id by the distributed optimizer.
        with dist_autograd.context() as context_id:
            outputs = model(inputs)
            dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
            opt.step(context_id)


def run_worker(rank, world_size, num_split):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'


    # Higher timeout is added to accommodate for kernel compilation time in case of ROCm.
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128, rpc_timeout=300)

    if rank == 0:
        rpc.init_rpc(
             # original "master"
            "worker0",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master(num_split)
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    # original: world_size = 3
    world_size = 2
    rank=int(sys.argv[1])
    # original:  for num_split in [1, 2, 4, 8]:
    for num_split in [8]:
        # original: mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
        run_worker(rank, world_size, num_split)

Do I have the wrong usage for RPC for multiple nodes? Or do you know any referencess/examples for multiple nodes? Thanks for your time and your suggestion!

Best,
Yang

As I understand, originally the example uses 3 processes: 1 master, and worker1 + worker2, where both workers are assigned to two CUDA devices. Now you only use 2 processes: worker0 (also as master) and worker1, both are assigned to CPU, so basically worker0 runs RPC on itself.

Here are my suggestions:

  1. I don’t know if a process runs RPC on itself will lead to memory leak or not. Ideally it should work as doing the work locally, but I am not sure if such an unreasonable corner case is supported well. You can update the code to train ResNetShard1 locally.
  2. I think you should still use mp.spawn in the main function. I haven’t tried multi-node training by myself. Does mp.spawn fail on multiple nodes?
  3. Out of curiosity, since I don’t have experience on running the computation on CPU cores via RPC, I wonder if you can run the same code on 2 CPU cores of the same machine, by changing “cuda:0” and “cuda:1” to “cpu:0” and “cpu:1”, respectively. We can check if this will also cause memory leak as well. If so, the problem could be TensorPipe + CPU.

Hi Yi Wang,

Thanks for your good suggestions. First, I need to correct a mistake in my observation:
use mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True) also shows memory leak problem locally, same with use run_worker(rank, world_size, num_split). I only observe the master process, which does not shows growth. But the worker part shows growth, which has the same result with using run_worker(rank, world_size, num_split) and also could be observed in the tutorial for the ResNet.

In my script for transformer models, both mp.spawn() method and run_worker() have similar result :

  • Set the num_worker_threads=2, memory does not increase
  • Set the num_worker_threads=16, memory first increases to a certain level and stops growing
  • Set the num_worker_threads=128, memory keeps growing after every forward pass

For the tutorial for the ResNet, seems ResNet has more activations for transmission:

  • Set the num_worker_threads=2, need to run for more than set timeout (60000 ms)
  • Set the num_worker_threads=16, need to run for more than set timeout (60000 ms)
  • Set the num_worker_threads=64, memory keeps growing after every forward pass in one batch and is reduced after finishing one batch, and the next batch goes to a larger value. Eg, increase from ~230M to peak memory ~2090M in the first batch; then is reduced to ~235M and goes to peak ~ 2250M in the second batch. But the growth trend is slowing down.
  • Set the num_worker_threads=128, similar to 64. But the peak memory is larger.

And for your suggestions:

I think it is not related to RPC in itself since only the memory for the master process does not keep growing when using mp.spawn(). But I am not sure whether it is a normal phenomenon since the growth speed for memory seems to show slowing down after multiple batches in the original tutorial. I will increase the batch size for my script.

I do not use mp.spawn() is due to cannot give a certain rank in mp.spawn() for multiple nodes. Or any suggestions or suitable usage for this API?

Yes, I could change “cuda:0” and “cuda:1” to “cpu:0” and “cpu:1”, and the code runs successfully. But it also shows a memory leak problem.

Thanks for your reply and suggestions! Hope to hear more of your thoughts

Best,
YANG

Thanks for all the detailed comments! Just want to confirm my understanding:

Are you saying that, even in the tutorial for the ResNet instead of your own code, you can still observe the memory leak on devices cuda:0 and cuda:1, just not on the master process.

So far, such memory leak is irrelevant to 1) RPC backend (TensorPipe or ProcessGroup), 2) worker’s device (GPU or CPU), or 3) how we launch distributed training (mp.spawn or just run_worker, even within a single host).

The only factor that is relevant to the memory leak is num_worker_threads:

  • Set the num_worker_threads=2, memory does not increase
  • Set the num_worker_threads=16, memory first increases to a certain level and stops growing
  • Set the num_worker_threads=128, memory keeps growing after every forward pass

@mrshenli

Hi Yi Wang,

Yes, I agree with all of you said except I do not test it on GPU (cuda:0, cuda:1) since without enough GPU devices.

Yes, I agree with all of you said except I do not test it on GPU (cuda:0, cuda:1) since without enough GPU devices.

So basically you mean the memory leak is reproducible as long as “cuda” is replaced by “cpu” in the tutorial for the ResNet.

If so, can you file a bug here. I will let RPC developers aware of this bug. Thanks!

Hi Yi Wang,

I have opened an issue 3 days ago but current no response. That will be very helpful if you could let them know this! Thank you very much!

Best,
Yang

I updated that bug thread with a summary of the debugging efforts we have so far. If no one answers that bug thread, I will bring it up in a meeting next week. Thanks!

Thanks for your time and effort! I will test it one more time on GPU tomorrow in case of any other problems before I close this thread. If no more other problems, I will close it tomorrow. Thanks!