Training code
``import os
import torch
from Prepro import Data_orig
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”)
print(“Using device:”, device)
class RNNModel(nn.Module):
def init(self, vocab_size, n_embd, n_hidden1, n_hidden2, block_size):
super(RNNModel, self).init()
self.embedding = nn.Embedding(vocab_size, n_embd)
self.rnn = nn.RNN(input_size=n_embd, hidden_size=n_hidden1, num_layers=2, batch_first=True, nonlinearity=‘tanh’)
self.fc1 = nn.Linear(n_hidden1 * block_size, n_hidden2)
self.fc2 = nn.Linear(n_hidden2, vocab_size)
self._init_weights()
def _init_weights(self):
nn.init.normal_(self.embedding.weight, mean=0, std=0.1)
nn.init.xavier_uniform_(self.fc1.weight)
nn.init.xavier_uniform_(self.fc2.weight)
def forward(self, x):
x = self.embedding(x) # (batch_size, block_size, n_embd)
x, _ = self.rnn(x) # (batch_size, block_size, n_hidden1)
x = x.reshape(x.size(0), -1) # (batch_size, n_hidden1 * block_size)
x = F.tanh(self.fc1(x)) # (batch_size, n_hidden2)
x = self.fc2(x) # (batch_size, vocab_size)
return x
Model parameters
n_embd = 10
n_hidden1 = 500
n_hidden2 = 200
block_size = 3
g = torch.Generator().manual_seed(2147483647)
Load the dataset
dataset = Data_orig()
X_train, X_test, y_train, y_test = train_test_split(dataset.X, dataset.Y,test_size=0.1, random_state=42)
Initialize the model
model = RNNModel(vocab_size=dataset.vocab_size, n_embd=n_embd, n_hidden1=n_hidden1, n_hidden2=n_hidden2, block_size=block_size).to(device)
Define optimizer, loss function, and scheduler
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10000, gamma=0.5)
Create a directory to save checkpoints
checkpoint_dir = “checkpoints”
os.makedirs(checkpoint_dir, exist_ok=True)
Training loop with checkpoint saving
max_steps = 100000
batch_size = 50
checkpoint_interval = 10000 # Save every 10,000 steps
for i in range(max_steps):
# Mini-batch sampling
ix = torch.randint(0, X_train.shape[0], (batch_size,))
x_batch = X_train[ix].to(device)
y_batch = y_train[ix].to(device)
# Forward pass
optimizer.zero_grad()
outputs = model(x_batch)
loss = criterion(outputs, y_batch)
# Backward pass and optimization
loss.backward()
optimizer.step()
scheduler.step()
# Track stats and print progress
if i % checkpoint_interval == 0:
print(f'Step {i}/{max_steps}: Loss = {loss.item():.4f}, Learning Rate = {scheduler.get_last_lr()[0]:.6f}')
# Save checkpoint
if i % checkpoint_interval == 0 and i > 0:
checkpoint_path = os.path.join(checkpoint_dir, f'model_step_{i}.pth')
torch.save({
'step': i,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'loss': loss.item(),
}, checkpoint_path)
print(f"Checkpoint saved at step {i} to {checkpoint_path}")
Final save at the end of training
final_checkpoint_path = os.path.join(checkpoint_dir, ‘model_final.pth’)
torch.save({
‘step’: max_steps,
‘model_state_dict’: model.state_dict(),
‘optimizer_state_dict’: optimizer.state_dict(),
‘scheduler_state_dict’: scheduler.state_dict(),
‘loss’: loss.item(),
}, final_checkpoint_path)
print(f"Final model saved to {final_checkpoint_path}")``
Samplng code
``import torch
from Prepro import Data_orig
from Rnn_generator import RNNModel
def sample_and_infer(data, checkpoint_path, device, start_str, num_samples, block_size):
“”"
Samples data and performs inference using a pre-trained model.
Parameters:
data (Data_orig): Preprocessed dataset.
checkpoint_path (str): Path to the model checkpoint.
device (torch.device): Device to use for the computation.
start_str (str): Starting string for the text generation.
num_samples (int): Number of characters to generate.
block_size (int): Size of the input sequence.
Returns:
str: Generated text.
"""
# Load pre-trained model
model = RNNModel(vocab_size=data.vocab_size, n_embd=10, n_hidden1=500, n_hidden2=200, block_size=block_size).to(device)
checkpoint = torch.load(checkpoint_path, map_location=device,weights_only=True)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)
model.eval() # Set the model to evaluation mode
# Convert starting string to tensor
input_indices = torch.tensor([data.stoi[ch] for ch in start_str], device=device).unsqueeze(0)
# Generate text
generated = start_str
for _ in range(num_samples):
with torch.no_grad():
emb = model.embedding(input_indices)
rnn_out, _ = model.rnn(emb)
rnn_out = rnn_out.reshape(rnn_out.size(0), -1)
logits = model.fc2(torch.tanh(model.fc1(rnn_out)))
probs = torch.softmax(logits, dim=-1)
next_index = torch.multinomial(probs, num_samples=1).item()
generated += data.itos[next_index]
input_indices = torch.cat((input_indices, torch.tensor([[next_index]], device=device)), dim=1)
input_indices = input_indices[:, -block_size:]
return generated
Usage example
dataset = Data_orig()
checkpoint_path = r"C:\Users\Rick\Name_generator\checkpoints\model_step_90000.pth"
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”)
start_str = “a”
num_samples = 10
block_size = 3
generated_text = sample_and_infer(dataset, checkpoint_path, device, start_str, num_samples, block_size)
print(“Generated Text:”)
print(generated_text)``
Now when i execute the sampling code instead of loading the pretrained weights it again starts the training from beginning and saves the checkpoints.