import torch
import torch.nn as nn
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from PIL import Image
import os
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
import numpy as np
from torch.cuda.amp import autocast, GradScaler
class ChannelAttention(nn.Module):
def init(self, in_channels, reduction_ratio=16):
super(ChannelAttention, self).init()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.max_pool = nn.AdaptiveMaxPool2d(1)
self.reduction_ratio = reduction_ratio
# Set a minimum value for in_channels to avoid division by zero
self.in_channels = max(in_channels, reduction_ratio)
self.fc1 = nn.Conv2d(self.in_channels, self.in_channels // reduction_ratio, kernel_size=1, stride=1, padding=0)
self.relu = nn.ReLU()
self.fc2 = nn.Conv2d(self.in_channels // reduction_ratio, self.in_channels, kernel_size=1, stride=1, padding=0)
def forward(self, x):
avg_out = self.fc2(self.relu(self.fc1(self.avg_pool(x))))
max_out = self.fc2(self.relu(self.fc1(self.max_pool(x))))
out = avg_out + max_out
return torch.sigmoid(out) * x
class SpatialAttention(nn.Module):
def init(self, kernel_size=7):
super(SpatialAttention, self).init()
assert kernel_size in (3, 7), “kernel size must be 3 or 7”
padding = 3 if kernel_size == 7 else 1
self.conv = nn.Conv2d(2, 1, kernel_size, padding=padding)
def forward(self, x):
avg_out = torch.mean(x, dim=1, keepdim=True)
max_out, _ = torch.max(x, dim=1, keepdim=True)
out = torch.cat([avg_out, max_out], dim=1)
out = self.conv(out)
return torch.sigmoid(out) * x
class DualAttentionModule(nn.Module):
def init(self, in_channels):
super(DualAttentionModule, self).init()
self.channel_attention = ChannelAttention(in_channels)
self.spatial_attention = SpatialAttention()
def forward(self, x):
out = self.channel_attention(x) * x
out = self.spatial_attention(out) * out
return out
class DualAttentionNetwork(nn.Module):
def init(self, num_classes=10):
super(DualAttentionNetwork, self).init()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.dual_attention1 = DualAttentionModule(32)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
self.avg_pool = nn.AdaptiveAvgPool2d(7) # AdaptiveAvgPool2d to replace MaxPool2d
self.dual_attention2 = DualAttentionModule(64)
self.flatten = nn.Flatten() # Add a Flatten layer
self.fc1 = nn.Linear(6477, 512) # Adjusted the input size for fc1
self.fc2 = nn.Linear(512, num_classes)
def forward(self, x):
out = self.conv1(x)
out = self.relu(out)
out = self.dual_attention1(out)
out = self.conv2(out)
out = self.relu(out)
out = self.dual_attention2(out)
out = self.avg_pool(out) # Use AdaptiveAvgPool2d to reduce spatial dimensions
out = self.flatten(out) # Flatten the output before fc1
out = self.fc1(out)
out = self.relu(out)
out = self.fc2(out)
return out
####################################################################################################################
class ResidualBlock(nn.Module):
def init(self, channels):
super(ResidualBlock, self).init()
self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, stride=1, padding=1)
def forward(self, x):
residual = x
out = self.conv1(x)
out = self.relu(out)
out = self.conv2(out)
out += residual
out = self.relu(out)
return out
class ESRGenerator(nn.Module):
def init(self, num_channels=3, num_residual_blocks=16):
super(ESRGenerator, self).init()
# ESRGAN uses the same DualAttentionModule as before
self.dual_attention1 = DualAttentionModule(num_channels)
self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=3, stride=1, padding=1)
self.relu = nn.ReLU()
# Residual blocks
residual_blocks = []
for _ in range(num_residual_blocks):
residual_blocks.append(ResidualBlock(64))
self.residual_blocks = nn.Sequential(*residual_blocks)
# Post-residual blocks convolution
self.post_residual_conv = nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1)
# Pixel-shuffle upsampling
self.upsample = nn.Sequential(
nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
nn.PixelShuffle(upscale_factor=2),
nn.ReLU(),
nn.Conv2d(64, 256, kernel_size=3, stride=1, padding=1),
nn.PixelShuffle(upscale_factor=2),
nn.ReLU(),
nn.Conv2d(64, num_channels, kernel_size=3, stride=1, padding=1),
nn.Tanh() # Tanh activation for output (range -1 to 1)
)
def forward(self, x):
out = self.dual_attention1(x)
out = self.conv1(out)
out = self.relu(out)
residual = self.residual_blocks(out)
out = self.post_residual_conv(residual) + out
out = self.upsample(out)
return out
ESRGAN Discriminator Network
class ESRDiscriminator(nn.Module):
def init(self, num_channels=3):
super(ESRDiscriminator, self).init()
self.layers = nn.Sequential(
nn.Conv2d(num_channels, 64, kernel_size=3, stride=1, padding=1),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(64),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(128, 128, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(128),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(256, 256, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(256),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm2d(512),
nn.LeakyReLU(0.2, inplace=True),
nn.Conv2d(512, 512, kernel_size=3, stride=2, padding=1),
nn.BatchNorm2d(512),
nn.LeakyReLU(0.2, inplace=True),
nn.AdaptiveAvgPool2d(1),
)
self.fc = nn.Linear(512, 1)
def forward(self, x):
out = self.layers(x)
out = out.view(out.size(0), -1)
out = self.fc(out)
return out
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”)
Initialize the GradScaler and set the model to use autocast
model = DualAttentionNetwork().to(device)
criterion = nn.CrossEntropyLoss()
scaler = GradScaler()
Initialize the ESRGAN Generator and Discriminator
esr_generator = ESRGenerator(num_channels=3).to(device)
esr_discriminator = ESRDiscriminator(num_channels=3).to(device)
Define loss functions for generator and discriminator
adv_criterion = nn.BCEWithLogitsLoss()
pixel_criterion = nn.L1Loss()
Define optimizers for generator and discriminator
Define hyperparameters
num_epochs = 25
batch_size = 4
gradient_accumulation_steps = 4 # Accumulate gradients over 4 batches before performing an optimizer step
learning_rate = 0.001
Define optimizers for generator and discriminator
esr_generator_optimizer = torch.optim.Adam(esr_generator.parameters(), lr=learning_rate)
esr_discriminator_optimizer = torch.optim.Adam(esr_discriminator.parameters(), lr=learning_rate)
Set device (GPU or CPU)
device = torch.device(“cuda” if torch.cuda.is_available() else “cpu”)
Load the dataset and apply transformations
transform = transforms.Compose([
transforms.Resize((96, 96)),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
train_dataset = datasets.ImageFolder(root=r’/home/praveen_2221cs11/UAVWeedSegmentation-main/data/trainval’, transform=transform)
test_dataset = datasets.ImageFolder(root=r"/home/praveen_2221cs11/UAVWeedSegmentation-main/data/test_different_bbch", transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
Initialize the Dual Attention Network
model = DualAttentionNetwork().to(device)
Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
###############################################################################################################
#################################################################################################################################
Training loop
Training loop
torch.cuda.empty_cache()
train_accuracy_list = []
total_steps = len(train_loader)
for epoch in range(num_epochs):
correct = 0
total = 0
for i, (images, labels) in enumerate(train_loader):
images = images.to(device) # Convert images to full-precision
labels = labels.to(device)
# Use autocast to perform forward and backward passes in half-precision
with autocast():
# Forward pass
outputs = model(images)
loss = criterion(outputs, labels)
# Compute accuracy for this batch
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
# Backward pass and optimize
optimizer.zero_grad()
scaler.scale(loss).backward()
# Use scaler.step() to update the model's parameters
if (i + 1) % gradient_accumulation_steps == 0:
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad()
# Print training progress
if (i + 1) % 100 == 0:
print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{total_steps}], Loss: {loss.item():.4f}")
# Calculate the accuracy for the epoch and store it in the list
accuracy = 100 * correct / total
train_accuracy_list.append(accuracy)
Plot the training accuracy graph
plt.plot(range(1, num_epochs + 1), train_accuracy_list, label=‘Training Accuracy’)
plt.xlabel(‘Epoch’)
plt.ylabel(‘Accuracy (%)’)
plt.title(‘Training Accuracy’)
plt.legend()
plt.grid(True)
plt.show()
plt.savefig(“training_accuracy_graph.png”)
Save the trained model
torch.save(model.state_dict(), “dual_attention_model.pt”)
Load the saved model for inference
model_load = DualAttentionNetwork()
model_load.load_state_dict(torch.load(“dual_attention_model.pt”))
model_load.to(device)
model_load.eval()
Rest of the code for generating confusion matrix and inference remains unchanged
def get_predictions(model, dataloader):
all_predictions = []
all_labels = []
model.eval()
with torch.no_grad():
for images, labels in dataloader:
images = images.to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted_class = torch.max(outputs, 1)
all_predictions.extend(predicted_class.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
return all_predictions, all_labels
def generate_confusion_matrix(model, dataloader):
# Get predictions for the test dataset
predicted_classes, true_labels = get_predictions(model, dataloader)
# Define class labels
class_labels = ["BG", "S", "W"]
# Initialize confusion matrix for all classes
conf_matrix = np.zeros((len(class_labels), len(class_labels)), dtype=np.int64)
# Fill the confusion matrix with true positives, false positives, and false negatives
for predicted, true in zip(predicted_classes, true_labels):
if true >= len(class_labels) or predicted >= len(class_labels):
continue
conf_matrix[true, predicted] += 1
# Calculate per-class accuracies
per_class_accuracies = {}
for i, label in enumerate(class_labels):
total_samples = conf_matrix[i, :].sum()
correct_predictions = conf_matrix[i, i]
accuracy = correct_predictions / total_samples if total_samples > 0 else 0.0
per_class_accuracies[label] = accuracy
# Print the confusion matrix
print("Confusion Matrix:")
print(conf_matrix)
# Handle NaN values in the normalized confusion matrix
conf_matrix_normalized = np.nan_to_num(
conf_matrix.astype('float') / conf_matrix.sum(axis=1, keepdims=True),
nan=0.0,
posinf=0.0,
neginf=0.0
)
# Print the normalized confusion matrix
print("\nNormalized Confusion Matrix:")
print(conf_matrix_normalized)
# Save the confusion matrix and normalized confusion matrix in separate files
np.savetxt("confusion_matrix.txt", conf_matrix, fmt="%d", delimiter="\t")
np.savetxt("confusion_matrix_normalized.txt", conf_matrix_normalized, fmt="%.2f", delimiter="\t")
# Create a heatmap for the confusion matrix
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.savefig("confusion_matrix_heatmap.png") # Save the heatmap as PNG
plt.show()
# Create a heatmap for the normalized confusion matrix
plt.figure(figsize=(6, 4))
sns.heatmap(conf_matrix_normalized, annot=True, fmt='.2f', cmap='Blues')
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Normalized Confusion Matrix')
plt.savefig("confusion_matrix_normalized_heatmap.png") # Save the heatmap as PNG
plt.show()
Generate confusion matrix and heatmaps for the test dataset
generate_confusion_matrix(model_load, test_loader)
Function to perform inference on a single image
def test_single_image(image_path, model):
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])
image = Image.open(image_path).convert(‘RGB’)
image = transform(image).unsqueeze(0)
# Move the image tensor to the same device as the model (GPU)
image = image.to(device)
with torch.no_grad():
output = model(image)
_, predicted_class = torch.max(output, 1)
return predicted_class.item()
class_labels = train_dataset.classes
List of image file paths for testing
test_image_paths = [
“/home/praveen_2221cs11/UAVWeedSegmentation-main/data/test/img/test/test_01.jpg”,
“/home/praveen_2221cs11/UAVWeedSegmentation-main/data/test/img/test/test_02.jpg”,
“/home/praveen_2221cs11/UAVWeedSegmentation-main/data/test/img/test/test_03.jpg”
# Add more image file paths here if needed
]
Perform inference on the entire test dataset
results = []
for image_path in test_image_paths:
predicted_class = test_single_image(image_path, model_load)
class_label = class_labels[predicted_class]
results.append((image_path, class_label))
Print or save the inference results:
for image_path, class_label in results:
print(f"Image: {image_path}, Predicted Class: {class_label}")
I am new to pyTorch and i am getting this error kindly resolve it
RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1