I wanted to make a model that can translate English to French, it is my first time in machine translation.
I downloaded parallel subtitles, first line of english.txt is the translation of first line of french.txt.
Code may look terrible,sorry even now.
def remove_unwanted(text: str) -> str:
"""
Takes string,
Replaces unique language characters like 'é','ä','ü',
Removes punctuations,
Makes lowercase,
Returns string.
"""
text = unidecode(text)
text = text.translate(str.maketrans("","",string.punctuation))
text = text.lower()
return text
def tokenize(text1: str, text2: str):
"""
Takes 2 strings,
tokenize them,
returns tokenized versions as list.
"""
tokenizer_1 = spacy.load("en_core_web_trf")
tokenizer_2 = spacy.load("fr_dep_news_trf")
text1 = text1.replace("\n", "")
text2 = text2.replace("\n", "")
tokenized_eng = tokenizer_1(text1.strip())
tokenized_fre = tokenizer_2(text2.strip())
return tokenized_eng, tokenized_fre, tokenizer_1, tokenizer_2
def build_vocab(tokenized_sentences, min_freq=2):
"""
Build a vocabulary for a language from a list of tokenized sentences.
Args:
- tokenized_sentences (list): A list of tokenized sentences.
- min_freq (int): Minimum frequency for a word to be included in the vocabulary.
Returns:
- word_to_idx (dict): A dictionary mapping words to their corresponding indices.
- idx_to_word (dict): A dictionary mapping indices to their corresponding words.
"""
# Count word frequencies
word_counts = Counter(tokenized_sentences)
# Filter out words that occur less than min_freq times
vocab = [word for word, count in word_counts.items() if count >= min_freq]
# Create dictionaries for word-to-index and index-to-word mappings
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for idx, word in enumerate(vocab)}
return word_to_idx, idx_to_word
text_en = ""
with open("english.txt", "r", encoding="utf-8") as file:
# Iterate through each line in the file and concatenate it to text_en
for line in file:
text_en += line.strip() + " "
text_en = remove_unwanted(text_en)
text_fr = ""
with open("french.txt", "r", encoding="utf-8") as file:
# Iterate through each line in the file and concatenate it to text_en
for line in file:
text_fr += line.strip() + " "
text_fr = remove_unwanted(text_fr)
en_word_to_idx, en_idx_to_word = build_vocab(text_en)
fr_word_to_idx, fr_idx_to_word = build_vocab(text_fr)
print(f"English Vocab size:\n{len(en_word_to_idx)}") #33
print(f"French Vocab size:\n{len(fr_word_to_idx)}")#35
my Encoder, Decoder and Seq2Seq models:
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size, num_layers=1):
super(Encoder, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.embedding = nn.Embedding(input_size, hidden_size)
self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers)
def forward(self, input_seq):
embedded = self.embedding(input_seq)
outputs, (hidden, cell) = self.lstm(embedded)
return hidden, cell
class Decoder(nn.Module):
def __init__(self, output_size, hidden_size, num_layers=1):
super(Decoder, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.embedding = nn.Embedding(output_size, hidden_size)
self.lstm = nn.LSTM(hidden_size, hidden_size, num_layers=num_layers)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, input_step, hidden, cell):
embedded = self.embedding(input_step)
output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
prediction = self.fc(output)
return prediction, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, source, target, teacher_forcing_ratio=0.5):
# Initialize variables to store the outputs
target_len = target.shape[0]
batch_size = target.shape[1]
target_vocab_size = len(target.vocab)
outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(self.device)
# Pass the source sequence through the encoder
encoder_hidden, encoder_cell = self.encoder(source)
# Initialize the decoder input with the SOS token
decoder_input = target[0, :]
# Loop through the target sequence one step at a time
for t in range(1, target_len):
output, encoder_hidden, encoder_cell = self.decoder(
decoder_input, encoder_hidden, encoder_cell
)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.argmax(1)
decoder_input = target[t] if teacher_force else top1
return outputs
Data Splitting:
# Combine the two lists into pairs
parallel_sentences = list(zip(text_en, text_fr))
# Shuffle the pairs randomly
np.random.shuffle(parallel_sentences)
# Define the split percentages
train_split = 0.7
val_split = 0.1
test_split = 0.2
# Calculate the split indices
num_samples = len(parallel_sentences)
train_idx = int(train_split * num_samples)
val_idx = int((train_split + val_split) * num_samples)
# Split the data into training, validation, and test sets
train_data = parallel_sentences[:train_idx]
val_data = parallel_sentences[train_idx:val_idx]
test_data = parallel_sentences[val_idx:]
# Define a custom dataset class
class TranslationDataset(Dataset):
def __init__(self, parallel_sentences, source_vocab, target_vocab):
self.parallel_sentences = parallel_sentences
self.source_vocab = source_vocab
self.target_vocab = target_vocab
def __len__(self):
return len(self.parallel_sentences)
def __getitem__(self, idx):
source_sentence, target_sentence = self.parallel_sentences[idx]
# Convert source and target sentences to numerical tensors using vocabularies
source_tensor = [self.source_vocab[word] for word in source_sentence.split()]
target_tensor = [self.target_vocab[word] for word in target_sentence.split()]
return source_tensor, target_tensor
# Assuming you have parallel_sentences, eng_word_to_idx, and fre_word_to_idx
train_dataset = TranslationDataset(train_data, en_word_to_idx, fr_word_to_idx)
# Define batch size
batch_size = 32 # You can adjust the batch size
# Define a custom collate function for DataLoader
def collate_fn(batch):
# Sort the batch by source sequence length (important for efficient padding)
batch.sort(key=lambda x: len(x[0]), reverse=True)
# Separate the source and target sequences
source_seqs, target_seqs = zip(*batch)
# Pad sequences to the length of the longest sequence in the batch
padded_source_seqs = pad_sequence(source_seqs, batch_first=True)
padded_target_seqs = pad_sequence(target_seqs, batch_first=True)
return padded_source_seqs, padded_target_seqs
# Create DataLoader instances with the custom collate function
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True, collate_fn=collate_fn)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
If you still read that shity code, thank you superman,
Finally, training loop:
import torch.optim as optim
device = "cuda" if torch.cuda.is_available() else "cpu"
english_vocab_size = 33
french_vocab_size = 35
encoder = Encoder(english_vocab_size, 15).to(device)
decoder = Decoder(french_vocab_size, 16).to(device)
model = Seq2Seq(encoder=encoder, decoder=decoder, device=device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epochs = 5
for epoch in range(epochs):
total_loss = 0.0
model.train()
for batch_idx, (input_seq, target_seq) in enumerate(train_loader):
input_seq = torch.LongTensor(input_seq).to(device)
target_seq = torch.LongTensor(target_seq).to(device)
optimizer.zero_grad()
output_seq = model(input_seq, target_seq)
loss = criterion(output_seq.view(-1, output_seq.shape[-1]), target_seq.view(-1))
loss.backward()
optimizer.step()
total_loss += loss.item()
if batch_idx % 100 == 0:
print(f"Epoch [{epoch+1}/{epochs}] Batch [{batch_idx+1}/{len(train_loader)}] Loss: {loss.item():.4f}")
average_loss = total_loss / len(train_loader)
print(f"Epoch [{epoch+1}/{epochs}] Average Loss: {average_loss:.4f}")
torch.save(model.state_dict(), "model.pth")
Error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-12-7eb876b3744a> in <cell line: 17>()
20 model.train()
21
---> 22 for batch_idx, (input_seq, target_seq) in enumerate(train_loader):
23
24 input_seq = torch.LongTensor(input_seq).to(device)
/usr/local/lib/python3.10/dist-packages/torch/utils/data/dataloader.py in __next__(self)
631 # TODO(https://github.com/pytorch/pytorch/issues/76750)
632 self._reset() # type: ignore[call-arg]
--> 633 data = self._next_data()
634 self._num_yielded += 1
635 if self._dataset_kind == _DatasetKind.Iterable and \
/usr/local/lib/python3.10/dist-packages/torch/utils/data/dataloader.py in _next_data(self)
675 def _next_data(self):
676 index = self._next_index() # may raise StopIteration
--> 677 data = self._dataset_fetcher.fetch(index) # may raise StopIteration
678 if self._pin_memory:
679 data = _utils.pin_memory.pin_memory(data, self._pin_memory_device)
/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/fetch.py in fetch(self, possibly_batched_index)
52 else:
53 data = self.dataset[possibly_batched_index]
---> 54 return self.collate_fn(data)
<ipython-input-11-0b974b8bfa01> in collate_fn(batch)
11
12 # Pad sequences to the length of the longest sequence in the batch
---> 13 padded_source_seqs = pad_sequence(source_seqs, batch_first=True)
14 padded_target_seqs = pad_sequence(target_seqs, batch_first=True)
15
/usr/local/lib/python3.10/dist-packages/torch/nn/utils/rnn.py in pad_sequence(sequences, batch_first, padding_value)
397 # assuming trailing dimensions and type of all the Tensors
398 # in sequences are same and fetching those from sequences[0]
--> 399 return torch._C._nn.pad_sequence(sequences, batch_first, padding_value)
400
401
TypeError: expected Tensor as element 0 in argument 0, but got str