DDP using gloo with 'uneven' datasets causes SIGABRT

I am trying to use DDP to scale up training inside a singularity container and I am stuck at training with ‘uneven’ datasets. I cannot split the data up to be ‘even’ between the various nodes, so I was hoping that one of the various join methods would work, but no luck.

I create the individual processes following a DDP tutorial:

def ddp_setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def ddp_cleanup():
    dist.destroy_process_group()

Note that using “nccl” in init_process_group causes the following scripts to hang forever.

Here is the function that is spawned:

def ddp_train_basic(rank: int,
                    world_size: int,
                    params_data: dict[str, Any],
                    params_networks: dict[str, Any],
                    params_train: dict[str, Any],
                    new_filenames: dict[int, list[str]]
                    ):
    print("Running basic DDP training on rank {:d}.".format(rank))
    ddp_setup(rank, world_size)

    tm = TrainMachine(params_data=pd,
                      params_networks=params_networks,
                      params_train=pt) # class for doing setup - this works

    tm.load_dataset()
    for ds in tm.dataset.datasets:  # move the data to the correct GPU
        for x in ['s', 'q']:
            setattr(ds, x, getattr(ds, x).double().to(rank))

    tm.create_networks()
    networks = [DDP(net.to(rank), device_ids=[rank]) for net in tm.networks]

    data_loader_train = DataLoader(tm.dataset.subsets['train'],
                                   batch_size=batch_size,
                                   shuffle=shuffle_train, num_workers=0,
                                   drop_last=False)
    data_loader_test = DataLoader(tm.dataset.subsets['test'],
                                  batch_size=1000, shuffle=False,
                                  num_workers=0, drop_last=False)
    optimizers = []
    for i, net in enumerate(networks):
        optimizer = torch.optim.Adam(net.parameters(), lr=lr)
        optimizers.append(optimizer)

    ## basic training loop
    for epoch in range(epochs):
        for network in networks:  # set into train mode
            network.train()
        with networks[0].join() and networks[1].join():
            for i, data in enumerate(data_loader_train):
                # for i,data in enumerate(train_dat):
                # get the inputs; data is a list of [inputs, labels]
                x_s, x_b = data
                x_s = x_s.double()
                x_b = x_b.double()

                # zero the parameter gradients
                for optimizer in optimizers:
                    optimizer.zero_grad(set_to_none=True)
                s_out = torch.squeeze(networks[0](x_s))
                b_out = torch.squeeze(networks[1](x_b))
                if s_out.shape[0] == data_loader_train.batch_size:
                    loss = torch.pow(s_out - b_out, 2).mean()
                    loss.backward()
                    for optimizer in optimizers:
                        optimizer.step()
            # after each epoch...
            if rank == 0:  # only for rank 0
                sstr = 'Epoch #{:4d} complete on rank {:d}'.format(epoch, rank)
            else:
                sstr = '\tEpoch #{:4d} complete on rank {:d}'.format(epoch, rank)
           print(sstr)

    ddp_cleanup()

Unfortunately, I’ve had to abstract away a bunch of information that might be helpful, but I’m hoping someone will recognize this issue. When I run the script above using the with context manager, I get the following error:

Epoch # 0 complete on rank 0
terminate called after throwing an instance of ‘gloo::EnforceNotMet’
what(): [enforce fail at …/third_party/gloo/gloo/transport/tcp/pair.cc:510] op.preamble.length <= op.nbytes. 262760 vs 4
Traceback (most recent call last):
File “/usr/src/app/train_multifile_DDP.py”, line 393, in
run_parallel_training(ddp_train_basic,
File “/usr/src/app/train_multifile_DDP.py”, line 385, in run_parallel_training
mp.spawn(train_fn,
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 239, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=‘spawn’)
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 197, in start_processes
while not context.join():
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 140, in join
raise ProcessExitedException(
torch.multiprocessing.spawn.ProcessExitedException: process 0 terminated with signal SIGABRT

The problem appears to be that op.preamble.length is far larger than op.nbytes. I have no idea what these are or why they should be related in this way.

That function is called in the following way:

def run_parallel_training(train_fn,
                          world_size: int,
                          params_data: dict[str, Any],
                          params_networks: dict[str, Any],
                          params_train: dict[str, Any],
                          new_filenames: dict[int, list[str]]
                          ):
    mp.spawn(train_fn,
             args=(world_size, params_data,
                   params_networks, params_train, new_filenames),
             nprocs=world_size,
             join=True)


if __name__ == "__main__":
    run_parallel_training(ddp_train_basic,
                          world_size=5,
                          params_data=params_data,
                          params_networks=params_networks,
                          params_train=params_train,
                          new_filenames=NEW_FILENAMES
                          )

I cannot replicate this error in simpler scripts, either. Watching memory use suggests that I am no where near running out of memory.

Can someone shed some light on what this means and why it might be happening?