import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import cv2
from torch.utils.data import Dataset, DataLoader
import torchvision.models.video as models
# -----------------------------
# Dataset Class
# -----------------------------
class LipReadingDataset(Dataset):
def __init__(self, video_dir, label_file, seq_length=30, transform=None):
self.video_dir = video_dir
self.label_file = label_file
self.seq_length = seq_length
self.transform = transform
self.video_files, self.labels = self._load_labels(label_file)
def _load_labels(self, label_file):
video_files = []
labels = []
available_videos = set(os.listdir(self.video_dir))
with open(label_file, 'r') as f:
for line in f:
video_file, label = line.strip().split(',')
if video_file in available_videos:
video_files.append(video_file)
labels.append(int(label))
return video_files, labels
def _load_video_frames(self, video_file):
frames = []
frames_folder = os.path.join(self.video_dir, video_file)
frame_index = 1
while True:
frame_path = os.path.join(frames_folder, f"frame_{frame_index}.jpg")
if not os.path.exists(frame_path):
break
frame = cv2.imread(frame_path)
if frame is None:
frame_index += 1
continue
frame = cv2.resize(frame, (128, 72))
frames.append(frame)
frame_index += 1
if len(frames) == 0:
raise RuntimeError(f"No frames found in {frames_folder}")
frames = np.array(frames)
frames = np.transpose(frames, (0, 3, 1, 2)) # (T, C, H, W)
return frames
def __len__(self):
return len(self.video_files)
def __getitem__(self, idx):
video_file = self.video_files[idx]
label = self.labels[idx]
frames = self._load_video_frames(video_file)
sequences = []
for i in range(0, len(frames) - self.seq_length + 1, self.seq_length):
sequences.append(frames[i:i + self.seq_length])
if len(sequences) > 0:
sequences = np.array(sequences)
if self.transform:
sequences = self.transform(sequences)
sequence = sequences[0]
else:
sequence = np.zeros((self.seq_length, 3, 72, 128))
return torch.tensor(sequence, dtype=torch.float32), torch.tensor(label, dtype=torch.long)
# -----------------------------
# Model Definition (ResNet3D + LSTM)
# -----------------------------
class LipReadingResNetLSTM(nn.Module):
def __init__(self, input_channels=3, num_classes=292, seq_length=30):
super(LipReadingResNetLSTM, self).__init__()
self.seq_length = seq_length
self.resnet3d = models.r3d_18(pretrained=True)
self.resnet3d.fc = nn.Identity()
self.lstm = nn.LSTM(input_size=512, hidden_size=512, num_layers=2, batch_first=True, dropout=0.5)
self.fc1 = nn.Linear(512, 1024)
self.fc2 = nn.Linear(1024, num_classes)
def forward(self, x):
batch_size, seq_length, channels, height, width = x.size()
x = x.reshape(batch_size, channels, seq_length, height, width)
x = self.resnet3d(x) # (B, 512)
x = x.unsqueeze(1).repeat(1, self.seq_length, 1) # (B, T, 512)
lstm_out, _ = self.lstm(x)
x = lstm_out[:, -1, :] # Use last output
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x
# -----------------------------
# Training Loop
# -----------------------------
def train_model(training_video_dir, validation_video_dir, train_label_file, val_label_file, model_save_path,
num_epochs=10, batch_size=4, learning_rate=0.001):
train_dataset = LipReadingDataset(video_dir=training_video_dir, label_file=train_label_file, seq_length=30)
val_dataset = LipReadingDataset(video_dir=validation_video_dir, label_file=val_label_file, seq_length=30)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
model = LipReadingResNetLSTM(input_channels=3, num_classes=292, seq_length=30)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
best_loss = float('inf')
for epoch in range(num_epochs):
# Training
model.train()
running_loss = 0.0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_train_loss = running_loss / len(train_loader)
print(f"Epoch [{epoch+1}/{num_epochs}] - Training Loss: {avg_train_loss:.4f}")
# Validation
model.eval()
val_loss = 0.0
with torch.no_grad():
for inputs, targets in val_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = criterion(outputs, targets)
val_loss += loss.item()
avg_val_loss = val_loss / len(val_loader)
print(f"Epoch [{epoch+1}/{num_epochs}] - Validation Loss: {avg_val_loss:.4f}")
# Save model if validation improves
if avg_val_loss < best_loss:
best_loss = avg_val_loss
torch.save(model.state_dict(), model_save_path)
print(f"Saved best model (val loss: {best_loss:.4f})")
# Save last epoch
torch.save(model.state_dict(), f"last_epoch_{model_save_path}")
print(f"Finished training. Best val loss: {best_loss:.4f}. Model saved to: {model_save_path}")
# -----------------------------
# Example Usage
# -----------------------------
if __name__ == "__main__":
training_video_dir = './Training'
validation_video_dir = './Validation'
train_label_file = './train_split.csv'
val_label_file = './val_split.csv'
model_save_path = 'best_lipreading_model.pth'
train_model(training_video_dir, validation_video_dir, train_label_file, val_label_file, model_save_path)
i’ve tried reducing the learning rate, I’ve tried changing the folder structure, and i’ve tried randomizing the csv file hoping something will change and still nothing changes