I have a class which inherits from MyModel (parent class) which is a relatively large model :
class DynamicModel(MyModel):
def __init__(self, num_inputs, num_outputs, h_dim=96, z_dim=48, n_layers=2, n_mixtures=10, device= torch.device('cuda' if torch.cuda.is_available() else 'cpu'), normalizer_input=None, normalizer_output=None,
*args, **kwargs):
super(DynamicModel, self).__init__(u_dim=num_inputs,y_dim= num_outputs, h_dim=h_dim, z_dim=z_dim, n_layers=n_layers, n_mixtures=n_mixtures, device=device)
# Save parameters
self.num_inputs = num_inputs
self.num_outputs = num_outputs
self.args = args
self.kwargs = kwargs
self.normalizer_input = normalizer_input
self.normalizer_output = normalizer_output
self.to(device)
@property
def num_model_inputs(self):
return self.num_inputs + self.num_outputs if self.ar else self.num_inputs
def forward(self, u, y=None):
if self.normalizer_input is not None:
u = self.normalizer_input.normalize(u)
if y is not None and self.normalizer_output is not None:
y = self.normalizer_output.normalize(y)
loss =super(DynamicModel, self).forward(u, y)
return loss
def generate(self, u, y=None):
if self.normalizer_input is not None:
u = self.normalizer_input.normalize(u)
y_sample, y_sample_mu, y_sample_sigma = super(DynamicModel, self).generate(u)
if self.normalizer_output is not None:
y_sample = self.normalizer_output.unnormalize(y_sample)
if self.normalizer_output is not None:
y_sample_mu = self.normalizer_output.unnormalize_mean(y_sample_mu)
if self.normalizer_output is not None:
y_sample_sigma = self.normalizer_output.unnormalize_sigma(y_sample_sigma)
return y_sample, y_sample_mu, y_sample_sigma
class ModelState:
def __init__(self,
seed,
nu,
ny,
h_dim=56,
z_dim=48,
n_layers=2,
n_mixtures=8,
device= torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
optimizer_type= "AdamW",
**kwargs):
torch.manual_seed(seed)
self.h_dim=h_dim
self.z_dim=z_dim
self.n_layers=n_layers
self.n_mixtures= n_mixtures
self.model = DynamicModel( num_inputs=nu, num_outputs=ny, h_dim=h_dim, z_dim=z_dim, n_layers=n_layers, n_mixtures=n_mixtures, device=device, **kwargs)
if optimizer_type == "AdaBelief":
self.optimizer = torch_optimizer.AdaBelief(self.model.parameters(),
lr= 1e-4,
betas=(0.9, 0.999),
eps=1e-6,
weight_decay=0
)
elif optimizer_type=="AdamW":
self.optimizer = torch.optim.AdamW(self.model.parameters(),
lr= 1e-4,
betas=(0.9, 0.999)
)
else:
# Optimization parameters
yogi = torch_optimizer.Yogi(self.model.parameters(), lr= 0.5e-4, betas=(0.95, 0.999), eps=1e-3, initial_accumulator=1e-6, weight_decay=0,)
self.optimizer = torch_optimizer.Lookahead(yogi, k=5, alpha=0.5)
def load_model(self, path, name='my_model.pt', map_location=None):
file = path if os.path.isfile(path) else os.path.join(path, name)
try:
if map_location is None:
ckpt = torch.load(file, map_location=lambda storage, loc: storage)
else:
ckpt = torch.load(file, map_location=map_location)
except NotADirectoryError:
raise Exception("Could not find model: " + file)
self.model.load_state_dict(ckpt["model"])
self.optimizer.load_state_dict(ckpt["optimizer"])
epoch = ckpt['epoch']
return epoch
def save_model(self, epoch, vloss, elapsed_time, path, name='my_model.pt'):
if not os.path.exists(path):
os.makedirs(path)
torch.save({
'epoch': epoch,
'model': self.model.state_dict(),
'optimizer': self.optimizer.state_dict(),
'vloss': vloss,
'elapsed_time': elapsed_time,
},
os.path.join(path, name))
Next I tried to use DistributedDataParallel
to train my model on multiple GPUs.
batch_size = args.batch_size
train_dataset = TensorDataset(train_x, train_y)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=args.world_size,
rank=rank,
shuffle=True,
)
train_loader = DataLoader(train_dataset,
batch_size=batch_size ,
shuffle=False,
num_workers=args.num_workers,
sampler=train_sampler,
worker_init_fn=seed_worker,
generator=g,)
normalizer_input, normalizer_output = compute_normalizer(train_loader)
# Define model
modelstate = ModelState(seed=seed,
nu=u_dim,
ny=y_dim,
#normalizer_input=normalizer_input,
#normalizer_output=normalizer_output
)
modelstate.model.cuda()
modelstate.model = torch.nn.parallel.DistributedDataParallel(modelstate.model, device_ids=[current_device])
print('passed distributed data parallel call')
When I trained the model with the distributed data on a cluster before finishing the first epoch, this error message interrupted the training process
loss_ = modelstate.model(u, y)
File "/home/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1194, in _call_impl
return forward_call(*input, **kwargs)
File "/home/lib/python3.8/site-packages/torch/nn/parallel/distributed.py", line 1026, in forward
if torch.is_grad_enabled() and self.reducer._rebuild_buckets():
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return value of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameter indices which did not receive grad for rank 9: 12 13 26 27 79 80 81 82 83 84 85 86 87 88
In addition, you can set the environment variable TORCH_DISTRIBUTED_DEBUG to either INFO or DETAIL to print out information about which particular parameters did not receive gradient on this rank as part of this error
Traceback (most recent call last):
I would truely appreciate if someone could provide a solution.