import os
import json
import numpy as np
import torch
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import pickle
Constants
OUTPUT_DIM = 126 # Each landmark point has 3 coordinates (x, y, z)
HIDDEN_DIM = 126
NUM_LAYERS = 4
BATCH_SIZE = 32
EPOCHS = 2
LEARNING_RATE = 0.001
MAX_LENGTH = 100 # Maximum sentence length
NUM_LANDMARKS = 42 # Number of landmark points
EOS_TOKEN = “”
DEBUG = False
MODEL_NAME = “sign_language_model_CSLT_MMDA_edu_5082024”
ROOT_FOLDER = r"D:\Downloads\Constient\sign-motion-regeneration\data"
VOCAB_NAME = “vocab_5082024.pkl”
class LoadData:
def init(self, NUM_LANDMARKS=42, DEBUG=False, NUM_SENTENCES=2):
self.NUM_LANDMARKS = NUM_LANDMARKS
self.DEBUG = DEBUG
self.DEBUG_LIMIT = NUM_SENTENCES
def load_landmarks_from_files(self, root_folder):
landmarks = []
labels = []
for limit, class_name in enumerate(os.listdir(root_folder)):
annotation_text = os.path.join(root_folder, class_name)
print("=" * 42)
print(annotation_text, limit)
if self.DEBUG and limit >= self.DEBUG_LIMIT:
break
if os.path.isdir(annotation_text):
for video_dir in os.listdir(annotation_text):
video_landmarks_dir = os.path.join(annotation_text, video_dir)
for file_name in os.listdir(video_landmarks_dir):
if file_name.endswith('.json'):
file_path = os.path.join(video_landmarks_dir, file_name)
with open(file_path, 'r') as f:
data_list = json.load(f)
landmarks_array = np.array([list(data.values()) for data in data_list])
if landmarks_array.shape[0] == self.NUM_LANDMARKS:
landmarks.append(landmarks_array)
labels.append(class_name)
return np.array(landmarks), np.array(labels)
def load_landmarks_from_files_as_tensor(self, root_folder):
landmarks, labels = self.load_landmarks_from_files(root_folder)
return torch.from_numpy(landmarks), labels
class Vocabulary:
def init(self, freq_threshold):
self.itos = {0: “”, 1: “”, 2: “”, 3: “”}
self.stoi = {“”: 0, “”: 1, “”: 2, “”: 3}
self.freq_threshold = freq_threshold
def __len__(self):
return len(self.itos)
@staticmethod
def tokenize(text):
return text.lower().split()
def build_vocabulary(self, sentence_list):
frequencies = Counter()
idx = 4
for sentence in sentence_list:
for word in self.tokenize(sentence):
frequencies[word] += 1
if frequencies[word] == self.freq_threshold:
self.stoi[word] = idx
self.itos[idx] = word
idx += 1
def numericalize(self, text):
tokenized_text = self.tokenize(text)
return [
self.stoi[token] if token in self.stoi else self.stoi["<UNK>"]
for token in tokenized_text
]
Custom dataset
class SignLanguageDataset(Dataset):
def init(self, sentences, landmarks, vocab):
self.landmarks = landmarks
self.sentences = sentences
self.vocab = vocab
def __len__(self):
return len(self.landmarks)
def __getitem__(self, idx):
landmark = torch.tensor(self.landmarks[idx], dtype=torch.float32)
raw_sentence = self.sentences[idx]
numericalized_sentence = [self.vocab.stoi["<SOS>"]]
numericalized_sentence += self.vocab.numericalize(raw_sentence)
numericalized_sentence.append(self.vocab.stoi["<EOS>"])
return torch.tensor(numericalized_sentence,dtype=torch.long),landmark
Encoder class
class Encoder(nn.Module):
def init(self, vocab_size, hidden_dim, num_layers):
super().init()
self.embedding = nn.Embedding(vocab_size, hidden_dim)
self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
def forward(self, x):
x = self.embedding(x)
print(f"Encoder: {x.shape}")
_, (hidden, cell) = self.lstm(x)
return hidden, cell
Decoder class
class Decoder(nn.Module):
def init(self, hidden_dim, output_dim, num_layers):
super().init()
self.lstm = nn.LSTM(hidden_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim) # output_dim is the number of landmarks * 3 (for x, y, z)
def forward(self, x, hidden, cell):
print(f"Decoder: {x.shape}")
output, (hidden, cell) = self.lstm(x, (hidden, cell))
prediction = self.fc(output)
return prediction, hidden, cell
Seq2Seq model
class Seq2Seq(nn.Module):
def init(self, encoder, decoder):
super().init()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg):
print(f"trg:{trg.shape}")
batch_size = trg.shape[0]
trg_len = trg.shape[1]
trg_dim = self.decoder.fc.out_features
# print(f"trg_dim:{trg_dim}")
# print(f"trg_len:{trg_len}")
outputs = torch.zeros(batch_size, trg_len, trg_dim).to(trg.device)
hidden, cell = self.encoder(src)
input = torch.zeros((batch_size, 1, trg_dim), device=trg.device) # Initial input, usually the embedding of <SOS>
print(input.size())
for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)
# teacher_force = torch.rand(1).item() < 0.5
# input = trg[:, t].unsqueeze(1) if teacher_force else output
print(f"output shape:{outputs.shape}")
return outputs
Function to train the model
def train(model, iterator, optimizer, criterion, device):
model.train()
epoch_loss = 0
for batch in iterator:
src, trg = batch
src, trg = src.to(device), trg.to(device)
print(src.size())
print(trg.size())
optimizer.zero_grad()
output = model(src, trg) #trg[::-1] changed
output = output.contiguous().view(-1, output.shape[-1])
trg = trg[:, 1:].contiguous().view(-1)
loss = criterion(output, trg)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
Function to save the model
def save_model(model, path):
torch.save(model.state_dict(), path)
Function to load the model
def load_model(model, path):
model.load_state_dict(torch.load(path))
return model
Infer function
def infer(model, text, vocab, device):
model.eval()
with torch.no_grad():
# Tokenize and numericalize the input text
tokens = [vocab.stoi[“”]] + vocab.numericalize(text) + [vocab.stoi[“”]]
src = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device)
# Pass the tokenized text through the encoder
hidden, cell = model.encoder(src)
# Initialize the input for the decoder with a zero tensor
input = torch.zeros((1, 1, NUM_LANDMARKS * INPUT_DIM), device=device)
outputs = []
# Generate landmarks
for _ in range(MAX_LENGTH):
output, hidden, cell = model.decoder(input, hidden, cell)
output_np = output.squeeze(0).cpu().numpy()
outputs.append(output_np)
# Check if all landmark points are zeros
if np.all(output_np == 0):
break
input = output
# Convert the list of outputs to a numpy array
outputs = np.array(outputs)
return outputs
Main execution
if name == “main”:
load_data = LoadData(NUM_LANDMARKS=42, DEBUG=DEBUG, NUM_SENTENCES=2)
train_landmarks, train_sentences = load_data.load_landmarks_from_files(ROOT_FOLDER)
# Create vocabulary
vocab = Vocabulary(freq_threshold=2)
vocab.build_vocabulary(train_sentences)
import pickle
with open(os.path.join('output', VOCAB_NAME), 'wb') as f:
pickle.dump(vocab, f)
INPUT_DIM = len(vocab)
# Create dataset and dataloader
train_dataset = SignLanguageDataset(train_sentences, train_landmarks,vocab)
train_iterator = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
# Initialize model
encoder = Encoder(INPUT_DIM, HIDDEN_DIM, NUM_LAYERS)
decoder = Decoder(HIDDEN_DIM, OUTPUT_DIM,NUM_LAYERS)
model = Seq2Seq(encoder, decoder)
print(model)
# Define optimizer and loss
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("[INFO] Available device: ", device)
model = model.to(device)
loss_history = []
# Training loop
for epoch in range(EPOCHS):
train_loss = train(model, train_iterator, optimizer, criterion, device)
loss_history.append(train_loss)
print(f"Epoch: {epoch:02}, Train Loss: {train_loss:.3f}")
if epoch % 10 == 0:
print("[INFO] Saving model for epoch ", epoch)
save_model(model, os.path.join('output', f'{MODEL_NAME}_{epoch}.pt'))
print("Loss History: ", loss_history)
# Save the model
save_model(model, os.path.join('output',f'{MODEL_NAME}.pt'))
==========================================
D:\Downloads\Constient\sign-motion-regeneration\data\Bicycle 0
Seq2Seq(
(encoder): Encoder(
(embedding): Embedding(5, 126)
(lstm): LSTM(126, 126, num_layers=4, batch_first=True)
)
(decoder): Decoder(
(lstm): LSTM(126, 126, num_layers=4, batch_first=True)
(fc): Linear(in_features=126, out_features=126, bias=True)
)
)
[INFO] Available device: cpu
torch.Size([32, 3])
torch.Size([32, 42, 3])
trg:torch.Size([32, 42, 3])
Encoder: torch.Size([32, 3, 126])
torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
Decoder: torch.Size([32, 1, 126])
output shape:torch.Size([32, 42, 126])
Traceback (most recent call last):
File “D:\Downloads\Constient\sign-motion-regeneration\test.py”, line 280, in
train_loss = train(model, train_iterator, optimizer, criterion, device)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Downloads\Constient\sign-motion-regeneration\test.py”, line 190, in train
loss = criterion(output, trg)
^^^^^^^^^^^^^^^^^^^^^^
File “D:\Downloads\Constient\sign-motion-regeneration\venv\Lib\site-packages\torch\nn\modules\module.py”, line 1553, in _wrapped_call_impl
return self._call_impl(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Downloads\Constient\sign-motion-regeneration\venv\Lib\site-packages\torch\nn\modules\module.py”, line 1562, in _call_impl
return forward_call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Downloads\Constient\sign-motion-regeneration\venv\Lib\site-packages\torch\nn\modules\loss.py”, line 1188, in forward
return F.cross_entropy(input, target, weight=self.weight,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “D:\Downloads\Constient\sign-motion-regeneration\venv\Lib\site-packages\torch\nn\functional.py”, line 3104, in cross_entropy
return torch._C._nn.cross_entropy_loss(input, target, weight, _Reduction.get_enum(reduction), ignore_index, label_smoothing)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Expected input batch_size (1344) to match target batch_size (3936).
Anyone please help me with this