Help with DDP in kaggle notebook

I am trying to do DDP in kaggle notebook as I don’t have multiple gpus on my local machine. Here is my code:

def main_worker(rank, world_size, arch: str = "resnet50",resume: str=None, start_epoch: int=0, EPOCHS=100,
                gpu: int=None, INPUT_SIZE=224, BATCH_SIZE=64, SAVE_CHKPT=True):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "5554"
    # torch.cuda.set_device(rank)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    
    # device = "cuda" if torch.cuda.is_available() else "cpu"
    # create model
    print("=> creating model")
    # resnet = torchvision.models.resnet50()
    # backbone = nn.Sequential(*list(resnet.children())[:-1])
    model = MoCo(models.__dict__[arch], batch_size = BATCH_SIZE, mlp=True)
    model = DDP(model.to(rank), gpu_ids=[rank])
    # print(model.to(device))

  # define base learning rate following a linear scaling rule
    base_lr = 0.03 * BATCH_SIZE / 256
    wd = 1e-4
    momentum = 0.9

    # define loss function (criterion) and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(
        model.parameters(),
        base_lr,
        momentum=momentum,
        weight_decay=wd)

    # optionally resume from a checkpoint
    if resume:
        if os.path.isfile(resume):
            print("=> loading checkpoint '{}'".format(resume))
            if gpu is None:
                checkpoint = torch.load(resume)
            else:
                # Map model to be loaded to specified single gpu.
                dist.barrier()
                loc = "cuda:{}".format(gpu)
                checkpoint = torch.load(resume, map_location=loc)
            start_epoch = checkpoint["epoch"]
            model.load_state_dict(checkpoint["state_dict"])
            optimizer.load_state_dict(checkpoint["optimizer"])
            print(
                "=> loaded checkpoint '{}' (epoch {})".format(
                    resume, checkpoint["epoch"]
                )
            )
        else:
            print("=> no checkpoint found at '{}'".format(resume))

    # Data loading code
    normalize = v2.Normalize(
        mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
    )
    augmentation = [
        v2.ToPILImage(),
        v2.RandomResizedCrop(INPUT_SIZE, scale=(0.2, 1.0)),
        v2.RandomApply(
            [v2.ColorJitter(0.4, 0.4, 0.4, 0.1)],
            p=0.8,  # not strengthened
        ),
        v2.RandomGrayscale(p=0.2),
        v2.RandomApply([GaussianBlur([0.1, 2.0])], p=0.5),
        v2.RandomHorizontalFlip(),
        v2.ToImage(),
        v2.ToDtype(torch.float, scale=True),
        normalize,
    ]

    transform = TwoCropsTransform(v2.Compose(augmentation))
    dataset = MyDataset(images_path, ds_new, transform = transform)

    dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        pin_memory = True,
        drop_last=True,
        num_workers=4,
        sampler = DistributedSampler(dataset)
    )

    for epoch in range(start_epoch, EPOCHS):
        adjust_learning_rate(optimizer, epoch, EPOCHS, base_lr)

        # mometum update: key network
        # momentum_val = cosine_schedule(epoch, EPOCHS, 0.996, 1)

        # train for one epoch
        train(dataloader, model, criterion, optimizer, epoch, rank)

        if SAVE_CHKPT and rank == 0:
            save_checkpoint(
                {
                    "epoch": epoch + 1,
                    "arch": arch,
                    "state_dict": model.module.state_dict(),
                    "optimizer": optimizer.state_dict(),
                },
                is_best=False,
                filename="checkpoint_{}.pth.tar".format(EPOCHS),
            )
    destroy_process_group()
def main():
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires 2 or more gpus to work, only found {n_gpus}"
    world_size = n_gpus
    mp.spawn(main_worker, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

When I run the code I get the error;

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/conda/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/conda/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'main_worker' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/conda/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/conda/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'main_worker' on <module '__main__' (built-in)>
W1124 10:27:11.488000 134512000853824 torch/multiprocessing/spawn.py:146] Terminating process 519 via signal SIGTERM
---------------------------------------------------------------------------
ProcessExitedException                    Traceback (most recent call last)
Cell In[47], line 2
      1 if __name__ == "__main__":
----> 2     main(main_worker)

Cell In[46], line 8, in main(worker)
      6 assert n_gpus >= 2, f"Requires 2 or more gpus to work, only found {n_gpus}"
      7 world_size = n_gpus
----> 8 mp.spawn(worker, args=(world_size,), nprocs=world_size, join=True)

File /opt/conda/lib/python3.10/site-packages/torch/multiprocessing/spawn.py:282, in spawn(fn, args, nprocs, join, daemon, start_method)
    276     msg = (
    277         f"This method only supports start_method=spawn (got: {start_method}).\n"
    278         "To use a different start_method use:\n\t\t"
    279         " torch.multiprocessing.start_processes(...)"
    280     )
    281     warnings.warn(msg, FutureWarning, stacklevel=2)
--> 282 return start_processes(fn, args, nprocs, join, daemon, start_method="spawn")

File /opt/conda/lib/python3.10/site-packages/torch/multiprocessing/spawn.py:238, in start_processes(fn, args, nprocs, join, daemon, start_method)
    235     return context
    237 # Loop on join until it returns True or raises an exception.
--> 238 while not context.join():
    239     pass

File /opt/conda/lib/python3.10/site-packages/torch/multiprocessing/spawn.py:178, in ProcessContext.join(self, timeout)
    170         raise ProcessExitedException(
    171             "process %d terminated with signal %s" % (error_index, name),
    172             error_index=error_index,
   (...)
    175             signal_name=name,
    176         )
    177     else:
--> 178         raise ProcessExitedException(
    179             "process %d terminated with exit code %d" % (error_index, exitcode),
    180             error_index=error_index,
    181             error_pid=failed_process.pid,
    182             exit_code=exitcode,
    183         )
    185 with open(self.error_files[error_index], "rb") as fh:
    186     original_trace = pickle.load(fh)

ProcessExitedException: process 1 terminated with exit code 1

I am currently using T4 x2 gpu on kaggle notebook. Can anyone guide to solving this problem.

I finally got it to work. To get it to work, make sure all codes are in a single cell. The cell should begin with %%writefile <filename>.py where filename can be anything. ddp.py in my case. Then in the next cell, write !python <filename>.py. This should run the code. I am yet to figure out how to print out logs since the current implementation does not.

1 Like