RuntimeError: Expected to mark a variable ready only once. This error is caused by one of the following reasons: 1) Use of a module parameter outside the `forward` function. Please make sure model parameters are not shared across multiple concurrent forward-backward passes. or try to use _set_static_graph() as a workaround if this module graph does not change during training loop.2) Reused parameters in multiple reentrant backward passes. For example, if you use multiple `checkpoint` functions to wrap the same part of your model, it would result in the same set of parameters been used by different reentrant backward passes multiple times, and hence marking a variable ready multiple times. DDP does not support such use cases in default. You can try to use _set_static_graph() as a workaround if your module graph does not change over iterations.
Parameter at index 206 with name bin_layer.bias has been marked as ready twice. This means that multiple autograd engine hooks have fired for this particular parameter during this iteration.
import math
from typing import Optional, Tuple
from transformers import AdamW, get_linear_schedule_with_warmup, AutoConfig
from transformers import BertForPreTraining, BertModel, RobertaModel, AlbertModel, AlbertForMaskedLM, RobertaForMaskedLM
import torch
import torch.nn as nn
import pytorch_lightning as pl
from sklearn.metrics import f1_score
from dataclasses import dataclass
import pytorch_lightning as pl
from transformers import AdamW, get_linear_schedule_with_warmup, AutoConfig
@dataclass
class ModelOutput():
loss: Optional[torch.FloatTensor] = None
all_loss: Optional[list] = None
loss_nums: Optional[list] = None
prediction_logits: torch.FloatTensor = None
seq_relationship_logits: torch.FloatTensor = None
tri_label_logits: torch.FloatTensor = None
reg_label_logits: torch.FloatTensor = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
class AlignScore(nn.Module):
def __init__(self, model='roberta-base',
using_pretrained=True,
*args,
**kwargs):
super(AlignScore, self).__init__()
# self.save_hyperparameters()
if 'roberta' in model:
if using_pretrained:
self.base_model = RobertaModel.from_pretrained(model)
self.mlm_head = RobertaForMaskedLM.from_pretrained(model).lm_head
else:
self.base_model = RobertaModel(AutoConfig.from_pretrained(model))
self.mlm_head = RobertaForMaskedLM(AutoConfig.from_pretrained(model)).lm_head
self.bin_layer = nn.Linear(self.base_model.config.hidden_size, 2)
self.dropout = nn.Dropout(p=0.1)
self.mlm_loss_factor = 0.5
self.need_mlm = True
self.is_finetune = False
self.ce_loss_fct1 =nn.CrossEntropyLoss(reduction='sum')
self.ce_loss_fct2 =nn.CrossEntropyLoss(reduction='sum')
def mse_loss(self, input, target, ignored_index=-100.0, reduction='mean'):
mask = (target == ignored_index)
out = (input[~mask]-target[~mask])**2
if reduction == "mean":
return out.mean()
elif reduction == "sum":
return out.sum()
def forward(self,batch):
# print(batch)
base_model_output = self.base_model(
input_ids = batch['input_ids'],
attention_mask = batch['attention_mask'])
prediction_scores = self.mlm_head(base_model_output.last_hidden_state) ## sequence_output for mlm
seq_relationship_score = self.bin_layer(self.dropout(base_model_output.pooler_output)) ## pooled output for classification
total_loss = None
if 'mlm_label' in batch.keys():
masked_lm_loss = self.ce_loss_fct1(prediction_scores.view(-1, self.base_model.config.vocab_size), batch['mlm_label'].view(-1))
next_sentence_loss = self.ce_loss_fct2(seq_relationship_score.view(-1, 2), batch['align_label'].view(-1)) / math.log(2)
masked_lm_loss_num = torch.sum(batch['mlm_label'].view(-1) != -100)
next_sentence_loss_num = torch.sum(batch['align_label'].view(-1) != -100)
return ModelOutput(
loss=total_loss,
all_loss=[masked_lm_loss, next_sentence_loss, ] if 'mlm_label' in batch.keys() else None,
loss_nums=[masked_lm_loss_num, next_sentence_loss_num,] if 'mlm_label' in batch.keys() else None,
prediction_logits=prediction_scores,
seq_relationship_logits=seq_relationship_score
)
import os
os.environ['TORCH_DISTRIBUTED_DEBUG'] = 'DETAIL'
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os
from src.summary_dataset import DSTDataSet
from src.summary_align_model import AlignScore
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
init_process_group(backend="nccl", rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
import json
from tqdm import tqdm
def calculate_forward_loss(model_output):
losses = model_output.all_loss
loss_nums = model_output.loss_nums
assert len(loss_nums) == len(losses), 'loss_num should be the same length as losses'
loss_mlm_num = torch.sum(loss_nums[0])
loss_bin_num = torch.sum(loss_nums[1])
loss_mlm = torch.sum(losses[0]) / loss_mlm_num if loss_mlm_num > 0 else 0.
loss_bin = torch.sum(losses[1]) / loss_bin_num if loss_bin_num > 0 else 0.
total_loss = 0.5 * loss_mlm + loss_bin
return total_loss
def train(model,epochs,train_loader,val_loader,ckpt_dir='./'):
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
for epoch in range(1, epochs):
with tqdm(train_loader, unit="batch") as tepoch:
for i,batch in enumerate(tepoch):
tepoch.set_description(f"Epoch {epoch}")
optimizer.zero_grad()
model_output = model(batch)
_loss = calculate_forward_loss(model_output)
_loss.backward()
print(align_model.bin_layer.bias.grad)
import pdb;pdb.set_trace()
optimizer.step()
tepoch.set_postfix(train_loss=_loss.item())
def main(rank: int,
world_size: int,
save_every: int,
total_epochs: int,
batch_size: int):
ddp_setup(rank, world_size)
datapath='_sample_test.json'
data = json.load(open(datapath))
train_eval_split = 0.90
need_mlm = True
model_name='roberta-base'
train_ds = DSTDataSet(dataset=data[:int(train_eval_split*len(data))], model_name=model_name, need_mlm=need_mlm)
val_ds = DSTDataSet(dataset=data[int(train_eval_split*len(data)):], model_name=model_name, need_mlm=need_mlm)
train_dl = DataLoader(train_ds, batch_size=batch_size, pin_memory=True,shuffle=False,sampler=DistributedSampler(train_ds))
val_dl = DataLoader(val_ds, batch_size=batch_size, shuffle=False,sampler=DistributedSampler(val_ds))
model = AlignScore(model_name).to(rank)
print(model)
model = DDP(model, device_ids=[rank],find_unused_parameters=True,static_graph=False)
train(model,3,train_dl,val_dl)
destroy_process_group()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('--total_epochs',default=4, type=int, help='Total epochs to train the model')
parser.add_argument('--save_every',default=1, type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=4, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
world_size = torch.cuda.device_count()
print("total-gpus:",world_size)
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)```