Hey there below is my DDP Trainer code where everything is working all right the thing here is upon completion of training on the nodes participating in training. snapshot is saved and the saved snapshot is loaded for inference. Also, inference code is also provided below. Since it is instruction-based fine-tuning, I also mentioned the data encoding(tokenization) class for preprocessing the data provided end of this topic.
If you think the code is gibberish and not able to understand it easily, kindly reply to this topic I will respond ASAP.
My thinking:
I’m thinking of something missing in the training step or doing the DDP training incorrectly.
Kindly take a look at it and help me out on this. Thank you very much.
Cmd used(2 machines node_rank will change accordingly):
torchrun --nproc-per-node=1 --nnode=2 --node_rank=0 --rdzv_id=123 --rdzv_endpoint=hostname:12345 --rdzv_backend=c10d Trainer_CPU 5 1
Trainer Code:
def ddp_setup():
init_process_group(backend=“gloo”)
class Trainer:
def init(
self,
model: torch.nn.Module,
train_data: DataLoader,
save_every: int,
snapshot_path: str,
optimizer
) → None:
self.local_rank = int(os.environ[“LOCAL_RANK”])
self.global_rank = int(os.environ[“RANK”])
self.model = model
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.epochs_run = 0
self.snapshot_path = snapshot_path
if os.path.exists(snapshot_path):
print(“Loading snapshot”)
self._load_snapshot(snapshot_path)
self.model = DDP(self.model)
def _load_snapshot(self, snapshot_path):
snapshot = torch.load(snapshot_path, map_location=torch.device("cpu"))
self.model.load_state_dict(snapshot["MODEL_STATE"])
self.epochs_run = snapshot["EPOCHS_RUN"]
print(f"Resuming training from snapshot at Epoch {self.epochs_run}")
def _save_snapshot(self, epoch):
snapshot = {
"MODEL_STATE": self.model.module.state_dict(),
"EPOCHS_RUN": epoch,
}
torch.save(snapshot, self.snapshot_path)
torch.save(self.model, "model.pt")
print(f"Epoch {epoch+1} | Training snapshot saved at {self.snapshot_path}")
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
def _custom_run_batch(self, source):
self.optimizer.zero_grad()
output = self.model(**source, labels=source["input_ids"].to(torch.long))
loss = output.loss
loss.backward()
self.optimizer.step()
print(f"CPU: {self.global_rank} | Loss: {loss} ")
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))["input_ids"])
print(f"[CPU{self.global_rank}] Epoch {epoch+1} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for batch in self.train_data:
batch = {k:v.to(f"cpu:{self.local_rank}") for k, v in batch.items()}
self._custom_run_batch(batch)
def train(self, max_epochs: int):
for epoch in range(self.epochs_run, max_epochs):
self._run_epoch(epoch)
if (self.local_rank == 0 and epoch % self.save_every == 0) or (epoch == max_epochs-1):
self._save_snapshot(epoch)
#Need check specific about dataset and dataloader
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
#GPT-Neo
#Loading the Dataset
def load_data(tokenizer):
df = pd.read_csv(“train_data.csv”)
X_train, y_train = df[“document”].tolist(), df[“summary”].tolist()
train_dataset = Encoding(X_train, y_train, tokenizer, max_length=512)
return train_dataset.data
def prepare_dataloader(dataset: list, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
def llm_load_train_objs(batch_size):
#Initialize model
model_name = ‘EleutherAI/gpt-neo-1.3B’
tokenize = GPT2Tokenizer.from_pretrained(model_name, bos_token=‘’, eos_token=“”, pad_token=“”, max_length=512)
model = GPTNeoForCausalLM.from_pretrained(model_name)
model.resize_token_embeddings(len(tokenize))
for buffers in model.buffers():
if buffers.dtype == torch.bool:
buffers.data = buffers.data.type(torch.uint8)
torch.manual_seed(42)
train_data = load_data(tokenize)
dataset = prepare_dataloader(train_data, batch_size)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
return model, dataset, optimizer
def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = “snapshot.pt”):
ddp_setup()
model, dataset, optimizer = llm_load_train_objs(batch_size)
print(len(dataset))
trainer = Trainer(model, dataset, save_every, snapshot_path, optimizer)
start_time = time.time()
trainer.train(total_epochs)
end_time = time.time()
print("Total Time taken: “, end_time - start_time, " seconds”)
destroy_process_group()
if name == “main”:
import argparse
parser = argparse.ArgumentParser(description=‘simple distributed training job’)
parser.add_argument(‘total_epochs’, type=int, help=‘Total epochs to train the model’)
parser.add_argument(‘save_every’, type=int, help=‘How often to save a snapshot’)
parser.add_argument(‘–batch_size’, default=2, type=int, help=‘Input batch size on each device (default: 32)’)
args = parser.parse_args()
main(args.save_every, args.total_epochs, args.batch_size)`
Inference code(using jupyter-notebook here):
model_name = “EleutherAI/gpt-neo-1.3B”
tokenize = GPT2Tokenizer.from_pretrained(model_name, bos_token=‘’, eos_token=“”, pad_token=“”, max_length=512)
model = GPTNeoForCausalLM.from_pretrained(model_name)
model.resize_token_embeddings(len(tokenize))
for buffers in model.buffers():
if buffers.dtype == torch.bool:
buffers.data = buffers.data.type(torch.uint8)
torch.manual_seed(42)
snapshot_path = ‘snapshot.pt’
snapshot = torch.load(snapshot_path, map_location=torch.device(‘cpu’))
model.load_state_dict(snapshot[“MODEL_STATE”])
content = “toyota team europe were banned from the world rally championship.”
instruct_prompt_2 = f’Summarize the given content:{content}\nSummary:’
generated = tokenize(f"{instruct_prompt_2}", return_tensors=‘pt’).input_ids
for buffers in model.buffers():
if buffers.dtype == torch.uint8:
buffers.data = buffers.data.type(torch.bool)
sample_outputs = model.generate(generated, top_k=50, max_length=200)
predicted_text = tokenize.decode(sample_outputs[0])
print(predicted_text)
Data Encoding class:
class Encoding():
def init(self, content, summary, tokenizer, max_length):
self.input_ids =
self.attn_masks =
self.labels = summary
self.data =
for content, summary in zip(content, summary):
prep_txt = f'<start>content:{content}\nsummary:{summary}<stop>'
#tokenize
encodings_dict = tokenizer(prep_txt, truncation=True, max_length=max_length, padding="max_length")
encodings_dict["input_ids"] = torch.tensor(encodings_dict["input_ids"]).to(torch.int32)
encodings_dict["attention_mask"] = torch.tensor(encodings_dict["attention_mask"]).to(torch.int8)
self.data.append(encodings_dict)
def __len__(self):
return len(self.input_ids)
def __getitem__(self, idx):
return self.input_ids[idx], self.attn_masks[idx], self.labels[idx]
Thanks,
Mano