Here’s the dataset class:
class arm_telemetry_data(torch.utils.data.Dataset):
'''
This is the Dataset object.
A Dataset object loads the training or test data into memory.
Your custom Dataset class MUST inherit torch's Dataset
Your custom Dataset class should have 3 methods implemented (you can add more if you want but these 3 are essential):
__init__(self) : Performs data loading
__getitem__(self, index) : Will allow for indexing into the dataset eg dataset[0]
__len__(self) : len(dataset)
'''
def __init__(self, partition, labels, split, file_type, normalize):
'''
-labels = {file_path1 : 0, file_path2: 0, ...}
-partition = {'train' : [file_path1, file_path2, ..],
'trainSG' : [file_path1, file_path2, ..],
'val' : [file_path1, file_path2]}
-split = 'train', 'trainSG', or 'val'
-file_type = 'dvfs' or 'simpleperf' [Different parsers for different file types]
'''
if(split not in ['train','trainSG','test']):
raise NotImplementedError('Can only accept Train, TrainSG, Test')
# Store the list of paths (ids) in the split
self.path_ids = partition[split]
# Store the list of labels
self.labels = labels
# Whether or not you want to normalize the data [default=True]
self.normalize = normalize
# File type for selecting the parser module
self.file_type = file_type
def __len__(self):
return len(self.path_ids)
def __getitem__(self, idx):
# Select the sample [id = file path of the dvfs file]
id = self.path_ids[idx]
# Get the label
y = self.labels[id]
if self.file_type == 'dvfs':
# Read and parse the file. NOTE: Each dvfs file has a different sampling frequency
X = self.read_dvfs_file(id)
elif self.file_type == 'simpleperf':
# Read and parse the simpleperf file
X = self.read_simpleperf_file(id)
else:
raise ValueError('Incorrect file type provided to the dataloader')
X_std = X
# Normalize
if self.normalize:
# X : Nchannel x num_data_points
# Calculate mean of each channel
mean_ch = torch.mean(X,dim=1)
# Calculate std of each channel
std_ch = torch.std(X,dim=1)
floor_std_ch = torch.tensor([1e-12]*X.shape[0])
std_ch = torch.maximum(std_ch,floor_std_ch) # To avoid the division by zero error
# Normalize
X_std = (X - torch.unsqueeze(mean_ch,1))/torch.unsqueeze(std_ch,1)
# Return the dvfs/hpc tensor (X_std), the corresponding label (y), and the corresponding file path that contains the name (id)
return X_std,y,id
Here’s the function that creates the dataloader:
def get_dataloader(args, partition, labels, custom_collate_fn, required_partitions, normalize_flag, file_type, N=None):
'''
Returns the dataloader objects for the different partitions.
params:
-partition = {'train' : [file_path1, file_path2, ..],
'test' : [file_path1, file_path2, ..],
'val' : [file_path1, file_path2]}
-labels : {file_path1 : 0, file_path2: 1, ...} (Benigns have label 0 and Malware have label 1)
-custom_collate_fn : Custom collate function object (Resamples and creates a batch of spectrogram B,T_chunk,Nch,H,W)
-required_partitions : required_partitions = {"train":T or F, "trainSG":T or F, "test":T or F}
-N : [num_training_samples, num_trainSG_samples, num_testing_samples]
If N is specified, then we are selecting a subset of files from the dataset
-normalize_flag : Will normalize the data if set to True. [Should be set to True for 'dvfs' and False for 'simpleperf']
-file_type : 'dvfs' or 'simpleperf' -> Different parsers used for each file_type
Output:
- trainloader, trainSGloader, testloader : Dataloader object for train, trainSG, and test data.
'''
trainloader, trainSGloader, testloader = None, None, None
# Initialize the custom dataset class for training, validation, and test data
if required_partitions["train"]:
ds_train_full = arm_telemetry_data(partition, labels, split='train', file_type= file_type, normalize=normalize_flag)
if required_partitions["trainSG"]:
ds_trainSG_full = arm_telemetry_data(partition, labels, split='trainSG', file_type= file_type, normalize=normalize_flag)
if required_partitions["test"]:
ds_test_full = arm_telemetry_data(partition, labels, split='test', file_type= file_type, normalize=normalize_flag)
if N is not None:
# You are using a subset of the complete dataset
print(f'[Info] ############### Using Subset : Num_train = {N[0]}, Num_val = {N[1]}, Num_test = {N[2]} ##################')
if len(N) != 3:
raise NotImplementedError('Size of Array should be 3')
if (required_partitions["train"]):
if (N[0] > ds_train_full.__len__()):
raise NotImplementedError(f"More samples than present in DS. Demanded : {N[0]} | Available: {ds_train_full.__len__()}")
else:
indices = torch.arange(N[0])
ds_train = data_utils.Subset(ds_train_full, indices)
if (required_partitions["trainSG"]):
if (N[1] > ds_trainSG_full.__len__()):
raise NotImplementedError(f'More samples than present in DS. Demanded : {N[1]} | Available: {ds_trainSG_full.__len__()}')
else:
indices = torch.arange(N[1])
ds_trainSG = data_utils.Subset(ds_trainSG_full, indices)
if (required_partitions["test"]):
if (N[2] > ds_test_full.__len__()):
raise NotImplementedError(f'More samples than present in DS. Demanded : {N[2]} | Available: {ds_test_full.__len__()}')
else:
indices = torch.arange(N[2])
ds_test = data_utils.Subset(ds_test_full, indices)
else:
# Using the complete dataset
if (required_partitions["train"]):
ds_train = ds_train_full
if (required_partitions["trainSG"]):
ds_trainSG = ds_trainSG_full
if (required_partitions["test"]):
ds_test = ds_test_full
# Create the dataloader object for training, validation, and test data
if (required_partitions["train"]):
trainloader = torch.utils.data.DataLoader(
ds_train,
num_workers=args.num_workers,
batch_size=args.train_batchsz,
collate_fn=custom_collate_fn,
shuffle=args.train_shuffle,
)
if (required_partitions["trainSG"]):
trainSGloader = torch.utils.data.DataLoader(
ds_trainSG,
num_workers=args.num_workers,
batch_size=args.train_batchsz,
collate_fn=custom_collate_fn,
shuffle=args.test_shuffle,
sampler = torch.utils.data.SequentialSampler(ds_trainSG)
)
if (required_partitions["test"]):
testloader = torch.utils.data.DataLoader(
ds_test,
num_workers=args.num_workers,
batch_size=args.test_batchsz,
collate_fn=custom_collate_fn,
shuffle=args.test_shuffle,
sampler = torch.utils.data.SequentialSampler(ds_test)
)
return trainloader, trainSGloader, testloader
And this is the custom collate_fn object:
class custom_collator(object):
def __init__(self, args, file_type):
# Parameters for truncating the dvfs and hpc time series. Consider the first truncated_duration seconds of the iteration
self.truncated_duration = args.truncated_duration
# Duration for which data is collected
self.cd = args.collected_duration
###################### Feature engineering parameters of the GLOBL channels ######################
self.chunk_time = args.chunk_time # Window size (in s) over which the spectrogram will be calculated
# Parameters for resampling DVFS
self.custom_num_datapoints = args.custom_num_datapoints # Number of data points in the resampled time series
self.resampling_type = args.resampling_type # Type of resampling. Can take one of the following values: ['max', 'min', 'custom']
self.resample = args.resample # Whether or not to resample. Default : True
# Parameters for feature reduction (for DVFS file_type)
self.reduced_frequency_size = args.reduced_frequency_size # dimension of frequency axis after dimensionality reduction
self.reduced_time_size = args.reduced_time_size # dimension of time axis after dimensionality reduction
self.reduced_feature_flag = args.feature_engineering_flag # If True, then we perform feature reduction. Defaule is False.
self.n_fft = args.n_fft # Order of fft for stft
# For selecting file_type : "dvfs" or "simpleperf"
self.file_type = file_type
###################### Feature engineering parameters of the HPC channels ########################
# Feature engineering parameters for simpleperf files
self.num_histogram_bins = args.num_histogram_bins
def __call__(self, batch):
'''
Takes a batch of files, outputs a tensor of of batch, the corresponding labels, and the corresponding file paths
- If reduced_feature_flag is False, then will return a list instead of a stacked tensor, for both dvfs and simpleperf
'''
if self.file_type == "dvfs":
# batch_dvfs : [iter1, iter2, ... , iterB] (NOTE: iter1 - Nchannels x T1 i.e. Every iteration has a different length. Duration of data collection is the same. Sampling frequency is different for each iteration)
# batch_labels : [iter1_label, iter2_label, ...iterB_label]
# batch_paths : [iter1_path, iter2_path, ...iterB_path]
batch_dvfs, batch_labels, batch_paths = list(zip(*batch))
if self.resample:
# Resample so that each iteration in the batch has the same number of datapoints
resampled_batch_dvfs, target_fs = self.resample_dvfs(batch_dvfs)
else:
resampled_batch_dvfs = batch_dvfs
with warnings.catch_warnings():
# NOTE: PCA will raise warning for time series with constant value. This is fine. The feature reduced vector will be all zeros.
warnings.filterwarnings("ignore", message="invalid value encountered in true_divide")
# Perform feature reduction on the batch of resampled dataset, so that number of features for every sample = 40
if self.reduced_feature_flag: #If reduced_feature_flag is set to True, then perform feature reduction
batch_tensor = self.perform_feature_reduction(resampled_batch_dvfs, target_fs)
else: # Just pass the resampled dvfs data
batch_tensor = resampled_batch_dvfs
elif self.file_type == "simpleperf":
# batch_hpc : [iter1, iter2, ... , iterB] (NOTE: iter1 - Nchannels x T1 i.e. Every iteration has a different length. Duration of data collection is the same. Sampling frequency is different for each iteration)
# batch_labels : [iter1_label, iter2_label, ...iterB_label]
# batch_paths : [iter1_path, iter2_path, ...iterB_path]
batch_hpc, batch_labels, batch_paths = list(zip(*batch))
if self.reduced_feature_flag:
# Stores the dimension reduced hpc for each patch
reduced_batch_hpc = []
# Divide the individual variates of the tensor into num_histogram_bins. And sum over the individual intervals to form feature size of 32 for each variate.
for hpc_iter_tensor in batch_hpc:
# Take the truncated duration of the tensor
hpc_iter_tensor = self.truncate_hpc_tensor(hpc_iter_tensor)
## hpc_intervals : [chunks of size - Nchannels x chunk_size] where chunk_size = lengthOfTimeSeries/self.num_histogram_bins
hpc_intervals = torch.tensor_split(hpc_iter_tensor, self.num_histogram_bins, dim=1)
# Take sum along the time dimension for each chunk to get chunks of size - Nchannels x 1
sum_hpc_intervals = [torch.sum(hpc_int,dim=1, keepdim=False) for hpc_int in hpc_intervals]
# Concatenate the bins to get the final feature tensor
hpc_feature_tensor = torch.cat(sum_hpc_intervals, dim=0)
# Adding one dimension for channel [for compatibility purpose]. N_Channel = 1 in this case.
reduced_batch_hpc.append(torch.unsqueeze(hpc_feature_tensor, dim=0))
batch_tensor = torch.stack(reduced_batch_hpc, dim=0)
else:
# NOTE: This is not a tensor. It is a list of the iterations.
batch_tensor = batch_hpc
return batch_tensor, torch.tensor(batch_labels), batch_paths
I have not attached the helper functions as they may not be necessary.
If you have a look at the custom collate_fn object under the call() function, there are two different file types. For “simpleperf” file type, the for loop iterates through all the batches. The issue is for the “dvfs” file type, where the for loop terminates early withouth going through all the batches.
Changing the number of workers did not change the result. The for loop still terminates with the same number of batches for the “dvfs” file type.