I am trying to use DDP to scale up training inside a singularity container and I am stuck at training with ‘uneven’ datasets. I cannot split the data up to be ‘even’ between the various nodes, so I was hoping that one of the various join methods would work, but no luck.
I create the individual processes following a DDP tutorial:
def ddp_setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def ddp_cleanup():
dist.destroy_process_group()
Note that using “nccl” in init_process_group causes the following scripts to hang forever.
Here is the function that is spawned:
def ddp_train_basic(rank: int,
world_size: int,
params_data: dict[str, Any],
params_networks: dict[str, Any],
params_train: dict[str, Any],
new_filenames: dict[int, list[str]]
):
print("Running basic DDP training on rank {:d}.".format(rank))
ddp_setup(rank, world_size)
tm = TrainMachine(params_data=pd,
params_networks=params_networks,
params_train=pt) # class for doing setup - this works
tm.load_dataset()
for ds in tm.dataset.datasets: # move the data to the correct GPU
for x in ['s', 'q']:
setattr(ds, x, getattr(ds, x).double().to(rank))
tm.create_networks()
networks = [DDP(net.to(rank), device_ids=[rank]) for net in tm.networks]
data_loader_train = DataLoader(tm.dataset.subsets['train'],
batch_size=batch_size,
shuffle=shuffle_train, num_workers=0,
drop_last=False)
data_loader_test = DataLoader(tm.dataset.subsets['test'],
batch_size=1000, shuffle=False,
num_workers=0, drop_last=False)
optimizers = []
for i, net in enumerate(networks):
optimizer = torch.optim.Adam(net.parameters(), lr=lr)
optimizers.append(optimizer)
## basic training loop
for epoch in range(epochs):
for network in networks: # set into train mode
network.train()
with networks[0].join() and networks[1].join():
for i, data in enumerate(data_loader_train):
# for i,data in enumerate(train_dat):
# get the inputs; data is a list of [inputs, labels]
x_s, x_b = data
x_s = x_s.double()
x_b = x_b.double()
# zero the parameter gradients
for optimizer in optimizers:
optimizer.zero_grad(set_to_none=True)
s_out = torch.squeeze(networks[0](x_s))
b_out = torch.squeeze(networks[1](x_b))
if s_out.shape[0] == data_loader_train.batch_size:
loss = torch.pow(s_out - b_out, 2).mean()
loss.backward()
for optimizer in optimizers:
optimizer.step()
# after each epoch...
if rank == 0: # only for rank 0
sstr = 'Epoch #{:4d} complete on rank {:d}'.format(epoch, rank)
else:
sstr = '\tEpoch #{:4d} complete on rank {:d}'.format(epoch, rank)
print(sstr)
ddp_cleanup()
Unfortunately, I’ve had to abstract away a bunch of information that might be helpful, but I’m hoping someone will recognize this issue. When I run the script above using the with context manager, I get the following error:
Epoch # 0 complete on rank 0
terminate called after throwing an instance of ‘gloo::EnforceNotMet’
what(): [enforce fail at …/third_party/gloo/gloo/transport/tcp/pair.cc:510] op.preamble.length <= op.nbytes. 262760 vs 4
Traceback (most recent call last):
File “/usr/src/app/train_multifile_DDP.py”, line 393, in
run_parallel_training(ddp_train_basic,
File “/usr/src/app/train_multifile_DDP.py”, line 385, in run_parallel_training
mp.spawn(train_fn,
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 239, in spawn
return start_processes(fn, args, nprocs, join, daemon, start_method=‘spawn’)
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 197, in start_processes
while not context.join():
File “/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py”, line 140, in join
raise ProcessExitedException(
torch.multiprocessing.spawn.ProcessExitedException: process 0 terminated with signal SIGABRT
The problem appears to be that op.preamble.length is far larger than op.nbytes. I have no idea what these are or why they should be related in this way.
That function is called in the following way:
def run_parallel_training(train_fn,
world_size: int,
params_data: dict[str, Any],
params_networks: dict[str, Any],
params_train: dict[str, Any],
new_filenames: dict[int, list[str]]
):
mp.spawn(train_fn,
args=(world_size, params_data,
params_networks, params_train, new_filenames),
nprocs=world_size,
join=True)
if __name__ == "__main__":
run_parallel_training(ddp_train_basic,
world_size=5,
params_data=params_data,
params_networks=params_networks,
params_train=params_train,
new_filenames=NEW_FILENAMES
)
I cannot replicate this error in simpler scripts, either. Watching memory use suggests that I am no where near running out of memory.
Can someone shed some light on what this means and why it might be happening?