Smote or Smoten creating synthetic data for chest-Xrays

import pandas as pd
import os
import pickle
from glob import glob
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torch.nn as nn
import torch
import torchvision
import seaborn as sns
from tqdm import tqdm
from PIL import Image
from itertools import chain
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import TensorDataset
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, roc_auc_score, \
    multilabel_confusion_matrix, roc_curve, auc
from sklearn.utils import resample
from imblearn.over_sampling import SMOTE
from collections import Counter

# Device configuration
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
else:
    print("MPS device not found.")

# Paths to Images and DataEntry file
all_xray_df = pd.read_csv('NihXrayData/Data_Entry_2017.csv')
allImagesGlob = glob('NihXrayData/images*/images/*.png')
# eof

all_image_paths = {os.path.basename(x): x for x in
                   allImagesGlob}
# print('Scans found:', len(all_image_paths), ', Total Headers', all_xray_df.shape[0])
all_xray_df['path'] = all_xray_df['Image Index'].map(all_image_paths.get)
all_xray_df.sample(3)

# # Data Pre Processing ####
# # Simplifying to 15 primary classes (adding No Finding as the 15th class)
condition_labels = ['Atelectasis', 'Consolidation', 'Infiltration', 'Pneumothorax', 'Edema', 'Emphysema', 'Fibrosis',
                    'Effusion', 'Pneumonia', 'Pleural_Thickening',
                    'Cardiomegaly', 'Nodule', 'Mass', 'Hernia', 'No Finding']
for label in condition_labels:
    all_xray_df[label] = all_xray_df['Finding Labels'].map(lambda result: 1.0 if label in result else 0)
all_xray_df.head(20)

all_xray_df['disease_vec'] = all_xray_df.apply(lambda target: [target[condition_labels].values], 1).map(
    lambda target: target[0])

all_xray_df.head()

# eof of one hot encoding


# Splitting the Data Frames into 80:20 split ###
train_df, test_df = train_test_split(all_xray_df, test_size=0.20, random_state=2020)


#  eof Data Splitting ###

# Count the number of samples in each class
# class_counts = train_df[condition_labels].sum()
# print("Training DataFrame:\n", class_counts)
# test_count = test_df[condition_labels].sum()
# print("Test Dataframe distribution:\n", test_count)
# exit()

# class_counts2 = train_df[condition_labels].sum()
# total_samples = len(train_df)
# class_weights = total_samples / (len(condition_labels) * class_counts2)
# # Convert class weights to a tensor
# class_weights_tensor = torch.FloatTensor(class_weights.values)

# List to store the resampled dataframes for each class
# target_samples_per_class = 3000
# resampled_dfs = []
#
# for label in condition_labels:
#     # Get the DataFrame for the current class
#     class_df = train_df[train_df[label] == 1]
#
#     # Get the number of samples in the current class
#     num_samples_class = len(class_df)
#
#     if num_samples_class < target_samples_per_class:
#         # If the class has fewer samples than the desired number, use resample to oversample it
#         resampled_class_df = resample(class_df,
#                                       replace=True,
#                                       n_samples=target_samples_per_class,
#                                       random_state=42)
#     else:
#         # If the class has enough samples, randomly select the desired number of samples
#         resampled_class_df = class_df.sample(n=target_samples_per_class, random_state=42)
#
#     # Add the resampled DataFrame to the list
#     resampled_dfs.append(resampled_class_df)
#
# # Concatenate all resampled DataFrames to create the final balanced DataFrame
# train_df_balanced = pd.concat(resampled_dfs)
#
# # Shuffle the final balanced DataFrame
# train_df_balanced = train_df_balanced.sample(frac=1, random_state=42)
#
# # Print class distribution after balancing
# class_counts_after_balancing = train_df_balanced[condition_labels].sum()
# print("Class distribution after balancing:")
# print(class_counts_after_balancing)

class XrayDataset(torch.utils.data.Dataset):
    def __init__(self, data_frame, transform=None):
        self.data_frame = data_frame
        self.transform = transform

    def __getitem__(self, idx):
        idx = idx % len(self.data_frame)  # Wrap around the index to ensure it's within the valid range
        row = self.data_frame.iloc[idx]
        address = row['path']
        data = Image.open(address).convert('RGB')
        label = np.array(row['disease_vec'], dtype=np.float32)

        if self.transform:
            data = self.transform(data)

        return data, torch.FloatTensor(label)

    def __len__(self):
        return len(self.data_frame)


# Define data augmentation for training
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

# Creating the Dataset for the train & test data frame with data augmentation for training
# Take a subset of the data for testing
subset_size = 5000
train_df_subset = train_df.sample(n=subset_size, random_state=42)

# Create the Dataset for the train data frame with data augmentation for training
train_dataset = XrayDataset(train_df_subset, transform=train_transform)

# Apply SMOTE to the training dataset
# X_train = np.array([x for x, _ in train_dataset])
# y_train = np.array([y for _, y in train_dataset])

# Separate single-label and multi-label training data
single_label_df = train_df_subset[train_df_subset[condition_labels].sum(axis=1) == 1]
multi_label_df = train_df_subset[train_df_subset[condition_labels].sum(axis=1) > 1]

hernia_single_label_df = train_df[train_df['Hernia'] == 1]
X_hernia = hernia_single_label_df.drop(columns=condition_labels + ['Finding Labels', 'disease_vec', 'path'])
y_hernia = hernia_single_label_df[condition_labels]

# Convert X_hernia and y_hernia to numpy arrays
X_hernia = X_hernia.fillna(0)
X_hernia = X_hernia.values
y_hernia = y_hernia.values

# Ensure y_hernia has the correct shape for binary classification (shape: (185,))
y_hernia = (y_hernia == 1).astype(int)


target_samples_hernia = 3000
smote = SMOTE(sampling_strategy={'Hernia': target_samples_hernia}, random_state=42, n_jobs=-1)
print("X_hernia data type:", type(X_hernia))
print("X_hernia shape:", X_hernia.shape)
print("y_hernia data type:", type(y_hernia))
print("y_hernia shape:", y_hernia.shape)
X_resampled_hernia, y_resampled_hernia = smote.fit_resample(X_hernia, y_hernia)
exit()

y_resampled_hernia = y_resampled_hernia.reshape(-1, 1)
# Combine the resampled "Hernia" class data with the rest of the balanced data
X_resampled = np.concatenate([X_train, X_resampled_hernia.reshape(-1, 3, 224, 224)], axis=0)
y_resampled = np.concatenate([y_train, y_resampled_hernia], axis=0)

# Convert back to tensors and update the train_dataset
train_dataset = TensorDataset(torch.tensor(X_resampled_hernia, dtype=torch.float32), torch.tensor(y_resampled_hernia, dtype=torch.float32))
exit()

test_dataset = XrayDataset(test_df, transform=transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
]))


# eof Dataloader #
np.random.seed(42)
torch.manual_seed(42)

# Set up ResNet 50 model
# Load pre-trained ResNet50 model
base_model = torchvision.models.resnet50(pretrained=True)
# Freeze the parameters of the base model
for param in base_model.parameters():
    param.requires_grad = False

# Replace the last fully connected layer with a new one for multi-label classification
num_features = base_model.fc.in_features
base_model.fc = nn.Linear(num_features, 15)

# Create the final model
model = base_model.to(mps_device)
# # Print the model summary
# print(model)

# Convert the class weights to a tensor
# Update class_counts, total_samples, and class_weights using the balanced DataFrame
class_counts = train_df_balanced[condition_labels].sum()
total_samples = len(train_df_balanced)
class_weights = total_samples / (len(condition_labels) * class_counts)

class_weights_tensor = torch.FloatTensor(class_weights.values)

class_sample_counts = train_df_balanced[condition_labels].sum(axis=0)
class_weights_list = 1.0 / (class_sample_counts / total_samples)
weights = train_df_balanced[condition_labels].apply(lambda x: class_weights_list * x, axis=1).sum(axis=1)
weights_tensor = torch.DoubleTensor(weights.values)

weighted_sampler = WeightedRandomSampler(weights=weights_tensor, num_samples=len(weights_tensor), replacement=True)

# Create data loaders with the weighted sampler
train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    num_workers=0,
    shuffle=False,  # Do not shuffle when using WeightedRandomSampler
    sampler=weighted_sampler  # Use the custom weighted sampler
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=64,
    num_workers=0,
    shuffle=False,
)

num_epochs = 1
weight_decay = 1e-4
learning_rate = 0.01
# eof Hyper Parameters


# criterion = nn.BCEWithLogitsLoss(weight=class_weights_tensor).to(mps_device)
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor).to(mps_device)

# criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)


def train(epoch):
    model.train()
    running_loss = 0.0
    train_total, train_correct = 0.0, 0.0

    # StepLR scheduler: decay the learning rate by a factor of 0.1 every 2 epochs
    scheduler.step()

    for i, (images, labels) in enumerate(train_loader):
        images = images.to(mps_device)
        labels = labels.to(mps_device)
        # Forward pass
        optimizer.zero_grad()
        outputs = model(images)
        # Loss Function
        loss = criterion(outputs, labels)
        # predicted_labels = (outputs > 0.5).float()
        # Backward and optimize
        # optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        # _, train_predicted = torch.argmax(y_output)
        train_total += labels.size(0)
        # train_correct += (train_predicted == labels.long()).sum().item()
        # y_train += labels.tolist()
        # y_pred += train_predicted.tolist()
        # train_correct += (predicted_labels == labels).sum().item()

        if i % 200 == 0:
            print('Epoch: {} [{}/{} ({:.0f}%)]\tloss: {:.6f}'.format(
                epoch, i * len(images), len(train_loader.dataset),
                       100. * i / len(train_loader), loss.item()))
        # Step the scheduler after each epoch
    scheduler.step()


# After creating the optimizer, create the learning rate scheduler
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
# Train the model
for epoch in range(1, num_epochs + 1):
    train(epoch)


def test(model, data_loader, device):
    model.eval()
    test_predictions = []
    test_labels = []
    class_accuracy = []
    class_precision = []
    class_f1_score = []
    class_subset_accuracy = []  # Add a list to store subset accuracy for each class
    single_label_correct, multi_label_correct = 0.0, 0.0
    single_label_total, multi_label_total = 0.0, 0.0

    with torch.no_grad():
        for images, labels in data_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            predicted_probs = torch.sigmoid(outputs)
            predicted_labels = (predicted_probs > 0.12).float()

            test_predictions.append(predicted_labels.cpu().numpy())
            test_labels.append(labels.cpu().numpy())

            # Calculate single-label and multi-label accuracy
            single_label_mask = labels.sum(dim=1) == 1
            multi_label_mask = labels.sum(dim=1) > 1
            single_label_correct += (predicted_labels[single_label_mask] == labels[single_label_mask]).sum().item()
            multi_label_correct += (predicted_labels[multi_label_mask] == labels[multi_label_mask]).sum().item()
            single_label_total += single_label_mask.sum().item()
            multi_label_total += multi_label_mask.sum().item()

    test_predictions = np.concatenate(test_predictions)
    test_labels = np.concatenate(test_labels)
    macro_f1 = f1_score(test_labels, test_predictions, average='micro', zero_division=1)
    accuracy = accuracy_score(test_labels, test_predictions)

    # Calculate prediction accuracy, precision, and F1 score for each class
    for i, class_label in enumerate(condition_labels):
        class_accuracy.append(accuracy_score(test_labels[:, i], test_predictions[:, i]))
        class_precision.append(precision_score(test_labels[:, i], test_predictions[:, i]))
        class_f1_score.append(f1_score(test_labels[:, i], test_predictions[:, i]))

        # Calculate subset accuracy for the current class
        class_subset_mask = (test_labels[:, i] == test_predictions[:, i]).all(axis=1)
        class_subset_accuracy.append(class_subset_mask.sum() / len(class_subset_mask))

    print('Model Micro F1-score: %.4f' % macro_f1)
    print('Model Accuracy: %.4f' % accuracy)
    print('Single-Label Accuracy: %.4f' % (single_label_correct / single_label_total))
    print('Multi-Label Accuracy: %.4f' % (multi_label_correct / multi_label_total))

    print('Prediction Metrics per Class:')
    for i, class_label in enumerate(condition_labels):
        print('%s - Accuracy: %.4f, Precision: %.4f, F1-score: %.4f, Subset Accuracy: %.4f' % (
            class_label, class_accuracy[i], class_precision[i], class_f1_score[i], class_subset_accuracy[i]))

    return test_labels, test_predictions, class_accuracy, class_precision, class_f1_score


test_labels, test_predictions, class_accuracy, class_precision, class_f1_score = test(model, test_loader, mps_device)

# Multi-Label Confusion Matrix
confusion = multilabel_confusion_matrix(test_labels, test_predictions)
print('Confusion Matrix\n')
for i, cm in enumerate(confusion):
    print(f'Class {condition_labels[i]}:\n{cm}\n')

# create plot
fig, c_ax = plt.subplots(1, 1, figsize=(9, 9))
for (i, label) in enumerate(condition_labels):
    fpr, tpr, thresholds = roc_curve(test_labels[:, i].astype(int), test_predictions[:, i])
    c_ax.plot(fpr, tpr, label='%s (AUC:%0.2f)' % (label, auc(fpr, tpr)))

# Set labels for plot
c_ax.legend()
c_ax.set_xlabel('False Positive Rate')
c_ax.set_ylabel('True Positive Rate')
plt.show()

Hello, I’m doing a multi-label classification model for chest x-ray I’m trying to use smote but it keeps breaking at the resample I been trying everything but can’t figure out why it’s breaking if their are any other suggestions outside of smote to create synthetic data for multi-label data please let me know I’ll appreciate the help