Dataloader is not iterating through all the batches

I have a dataloader which returns a tensor of shape (BatchSize, Nchannels, featureSize).

When I print out the number of samples and batches in the dataloader using the following snippet, I get the correct number of samples and batches:

print(f" - Number of samples : {len(dataloaderX.dataset)} | Number of batches: {len(dataloaderX)}")

When I run through the batches in a loop using enumerate,

for batch_idx,(batch_tensor, batch_labels, batch_paths) in enumerate(dataloaderX):
      <Processing the tensor here>      

the loop terminates successfully without error, but it only covers fraction of the batches that was in the dataloader. For example, if the number of batches in the dataloader = 159, the loop finishes only after covering 36 batches. I am not sure where the issue lies. Any pointers on how to debug this will be appreciated. I have written my own dataset class, dataloader class, and collate_fn.

Have you checked if the reported values make sense given the type of sampler used/other conditions as specified in the docs?

    .. warning:: ``len(dataloader)`` heuristic is based on the length of the sampler used.
                 When :attr:`dataset` is an :class:`~torch.utils.data.IterableDataset`,
                 it instead returns an estimate based on ``len(dataset) / batch_size``, with proper
                 rounding depending on :attr:`drop_last`, regardless of multi-process loading
                 configurations. This represents the best guess PyTorch can make because PyTorch
                 trusts user :attr:`dataset` code in correctly handling multi-process
                 loading to avoid duplicate data.

                 However, if sharding results in multiple workers having incomplete last batches,
                 this estimate can still be inaccurate, because (1) an otherwise complete batch can
                 be broken into multiple ones and (2) more than one batch worth of samples can be
                 dropped when :attr:`drop_last` is set. Unfortunately, PyTorch can not detect such
                 cases in general.

                 See `Dataset Types`_ for more details on these two types of datasets and how
                 :class:`~torch.utils.data.IterableDataset` interacts with
                 `Multi-process data loading`_.

https://pytorch.org/docs/stable/_modules/torch/utils/data/dataloader.html#DataLoader

Yes. The reported values make sense. Why the loop is not covering all the batches, is the only thing that doesn’t make sense.

Does it change with the number of workers? It might be useful to post your implementations (you can use random data to avoid any dependencies on datasets).

Here’s the dataset class:

class arm_telemetry_data(torch.utils.data.Dataset):
    ''' 
    This is the Dataset object.
    A Dataset object loads the training or test data into memory.
    Your custom Dataset class MUST inherit torch's Dataset
    Your custom Dataset class should have 3 methods implemented (you can add more if you want but these 3 are essential):
    __init__(self) : Performs data loading
    __getitem__(self, index) :  Will allow for indexing into the dataset eg dataset[0]
    __len__(self) : len(dataset)
    '''

    def __init__(self, partition, labels, split, file_type, normalize):
        '''
            -labels = {file_path1 : 0, file_path2: 0, ...}

            -partition = {'train' : [file_path1, file_path2, ..],
                                'trainSG' : [file_path1, file_path2, ..],
                                'val' : [file_path1, file_path2]}

            -split = 'train', 'trainSG', or 'val'
            -file_type = 'dvfs' or 'simpleperf' [Different parsers for different file types]                    
        '''
        if(split not in ['train','trainSG','test']):
            raise NotImplementedError('Can only accept Train, TrainSG, Test')

        # Store the list of paths (ids) in the split
        self.path_ids = partition[split] 
        
        # Store the list of labels
        self.labels = labels
        # Whether or not you want to normalize the data [default=True]
        self.normalize = normalize
        # File type for selecting the parser module
        self.file_type = file_type

    def __len__(self):
        return len(self.path_ids)
     
    def __getitem__(self, idx):
        # Select the sample [id = file path of the dvfs file]
        id = self.path_ids[idx]

        # Get the label
        y = self.labels[id]

        if self.file_type == 'dvfs':
            # Read and parse the file. NOTE: Each dvfs file has a different sampling frequency
            X = self.read_dvfs_file(id)
        elif self.file_type == 'simpleperf':
            # Read and parse the simpleperf file
            X = self.read_simpleperf_file(id)
        else:
            raise ValueError('Incorrect file type provided to the dataloader')
        X_std = X

        # Normalize
        if self.normalize:
            # X : Nchannel x num_data_points

            # Calculate mean of each channel
            mean_ch = torch.mean(X,dim=1)
            
            # Calculate std of each channel
            std_ch = torch.std(X,dim=1)
            floor_std_ch = torch.tensor([1e-12]*X.shape[0]) 
            std_ch = torch.maximum(std_ch,floor_std_ch) # To avoid the division by zero error
            
            # Normalize
            X_std = (X - torch.unsqueeze(mean_ch,1))/torch.unsqueeze(std_ch,1)
            
        # Return the dvfs/hpc tensor (X_std), the corresponding label (y), and the corresponding file path that contains the name (id)
        return X_std,y,id

Here’s the function that creates the dataloader:

def get_dataloader(args, partition, labels, custom_collate_fn, required_partitions, normalize_flag, file_type, N=None):
    '''
    Returns the dataloader objects for the different partitions.

    params: 
        -partition = {'train' : [file_path1, file_path2, ..],
                            'test' : [file_path1, file_path2, ..],
                            'val' : [file_path1, file_path2]}
                            
        -labels : {file_path1 : 0, file_path2: 1, ...}  (Benigns have label 0 and Malware have label 1)
        
        -custom_collate_fn : Custom collate function object (Resamples and creates a batch of spectrogram B,T_chunk,Nch,H,W)

        -required_partitions : required_partitions = {"train":T or F, "trainSG":T or F, "test":T or F}           
        
        -N  : [num_training_samples, num_trainSG_samples, num_testing_samples]
                If N is specified, then we are selecting a subset of files from the dataset 

        -normalize_flag : Will normalize the data if set to True. [Should be set to True for 'dvfs' and False for 'simpleperf'] 

        -file_type : 'dvfs' or 'simpleperf' -> Different parsers used for each file_type

    Output: 
        - trainloader, trainSGloader, testloader : Dataloader object for train, trainSG, and test data.
    '''
    trainloader, trainSGloader, testloader = None, None, None

    # Initialize the custom dataset class for training, validation, and test data
    if required_partitions["train"]:
        ds_train_full = arm_telemetry_data(partition, labels, split='train', file_type= file_type, normalize=normalize_flag)
    
    if required_partitions["trainSG"]:
        ds_trainSG_full = arm_telemetry_data(partition, labels, split='trainSG', file_type= file_type, normalize=normalize_flag)
    
    if required_partitions["test"]:
        ds_test_full = arm_telemetry_data(partition, labels, split='test', file_type= file_type, normalize=normalize_flag)

    if N is not None:
        # You are using a subset of the complete dataset
        print(f'[Info] ############### Using Subset : Num_train = {N[0]}, Num_val = {N[1]}, Num_test = {N[2]} ##################')
        if len(N) != 3:
            raise NotImplementedError('Size of Array should be 3')

        if (required_partitions["train"]):
            if (N[0] > ds_train_full.__len__()):
                raise NotImplementedError(f"More samples than present in DS. Demanded : {N[0]} | Available: {ds_train_full.__len__()}")
            else:
                indices = torch.arange(N[0])
                ds_train = data_utils.Subset(ds_train_full, indices)

        if (required_partitions["trainSG"]):
            if (N[1] > ds_trainSG_full.__len__()):
                raise NotImplementedError(f'More samples than present in DS. Demanded : {N[1]} | Available: {ds_trainSG_full.__len__()}')
            else:
                indices = torch.arange(N[1])
                ds_trainSG = data_utils.Subset(ds_trainSG_full, indices)

        if (required_partitions["test"]):
            if (N[2] > ds_test_full.__len__()):
                raise NotImplementedError(f'More samples than present in DS. Demanded : {N[2]} | Available: {ds_test_full.__len__()}')
            else:
                indices = torch.arange(N[2])
                ds_test = data_utils.Subset(ds_test_full, indices)

    else: 
        # Using the complete dataset
        if (required_partitions["train"]):
            ds_train = ds_train_full
            
        if (required_partitions["trainSG"]):
            ds_trainSG = ds_trainSG_full
            
        if (required_partitions["test"]):
            ds_test = ds_test_full
            
    
    # Create the dataloader object for training, validation, and test data
    if (required_partitions["train"]):
        trainloader = torch.utils.data.DataLoader(
            ds_train,
            num_workers=args.num_workers,
            batch_size=args.train_batchsz,
            collate_fn=custom_collate_fn,
            shuffle=args.train_shuffle,
        )

    if (required_partitions["trainSG"]):    
        trainSGloader = torch.utils.data.DataLoader(
            ds_trainSG,
            num_workers=args.num_workers,
            batch_size=args.train_batchsz,
            collate_fn=custom_collate_fn,
            shuffle=args.test_shuffle,
            sampler = torch.utils.data.SequentialSampler(ds_trainSG)
        )

    if (required_partitions["test"]):
        testloader = torch.utils.data.DataLoader(
            ds_test,
            num_workers=args.num_workers,
            batch_size=args.test_batchsz,
            collate_fn=custom_collate_fn,
            shuffle=args.test_shuffle,
            sampler = torch.utils.data.SequentialSampler(ds_test)
        )

    return trainloader, trainSGloader, testloader

And this is the custom collate_fn object:

class custom_collator(object):
    def __init__(self, args, file_type):
        # Parameters for truncating the dvfs and hpc time series. Consider the first truncated_duration seconds of the iteration
        self.truncated_duration = args.truncated_duration
        # Duration for which data is collected 
        self.cd = args.collected_duration 
        
        ###################### Feature engineering parameters of the GLOBL channels ######################
        self.chunk_time = args.chunk_time # Window size (in s) over which the spectrogram will be calculated  
        
        # Parameters for resampling DVFS
        self.custom_num_datapoints = args.custom_num_datapoints # Number of data points in the resampled time series
        self.resampling_type = args.resampling_type # Type of resampling. Can take one of the following values: ['max', 'min', 'custom']
        self.resample = args.resample # Whether or not to resample. Default : True

        # Parameters for feature reduction (for DVFS file_type)
        self.reduced_frequency_size = args.reduced_frequency_size # dimension of frequency axis after dimensionality reduction
        self.reduced_time_size = args.reduced_time_size # dimension of time axis after dimensionality reduction
        self.reduced_feature_flag = args.feature_engineering_flag # If True, then we perform feature reduction. Defaule is False.
        self.n_fft = args.n_fft # Order of fft for stft

        # For selecting file_type : "dvfs" or "simpleperf"
        self.file_type = file_type

        ###################### Feature engineering parameters of the HPC channels ########################
        # Feature engineering parameters for simpleperf files
        self.num_histogram_bins = args.num_histogram_bins

    def __call__(self, batch):
        '''
        Takes a batch of files, outputs a tensor of of batch, the corresponding labels, and the corresponding file paths
        - If reduced_feature_flag is False, then will return a list instead of a stacked tensor, for both dvfs and simpleperf
        '''
        if self.file_type == "dvfs":
            # batch_dvfs : [iter1, iter2, ... , iterB]  (NOTE: iter1 - Nchannels x T1 i.e. Every iteration has a different length. Duration of data collection is the same. Sampling frequency is different for each iteration)
            # batch_labels : [iter1_label, iter2_label, ...iterB_label]
            # batch_paths : [iter1_path, iter2_path, ...iterB_path]
            batch_dvfs, batch_labels, batch_paths = list(zip(*batch))

            if self.resample:
                # Resample so that each iteration in the batch has the same number of datapoints
                resampled_batch_dvfs, target_fs = self.resample_dvfs(batch_dvfs)
            else:
                resampled_batch_dvfs = batch_dvfs 

            with warnings.catch_warnings():
                # NOTE: PCA will raise warning for time series with constant value. This is fine. The feature reduced vector will be all zeros.
                warnings.filterwarnings("ignore", message="invalid value encountered in true_divide")
                # Perform feature reduction on the batch of resampled dataset, so that number of features for every sample = 40
                if self.reduced_feature_flag: #If reduced_feature_flag is set to True, then perform feature reduction
                    batch_tensor = self.perform_feature_reduction(resampled_batch_dvfs, target_fs)
            
                else: # Just pass the resampled dvfs data
                    batch_tensor = resampled_batch_dvfs
         

        elif self.file_type == "simpleperf":
            # batch_hpc : [iter1, iter2, ... , iterB]  (NOTE: iter1 - Nchannels x T1 i.e. Every iteration has a different length. Duration of data collection is the same. Sampling frequency is different for each iteration)
            # batch_labels : [iter1_label, iter2_label, ...iterB_label]
            # batch_paths : [iter1_path, iter2_path, ...iterB_path]
            batch_hpc, batch_labels, batch_paths = list(zip(*batch))

            if self.reduced_feature_flag:
                # Stores the dimension reduced hpc for each patch
                reduced_batch_hpc = []

                # Divide the individual variates of the tensor into num_histogram_bins. And sum over the individual intervals to form feature size of 32 for each variate.
                for hpc_iter_tensor in batch_hpc:
                    # Take the truncated duration of the tensor
                    hpc_iter_tensor = self.truncate_hpc_tensor(hpc_iter_tensor)
                    
                    ## hpc_intervals : [chunks of size - Nchannels x chunk_size] where chunk_size = lengthOfTimeSeries/self.num_histogram_bins
                    hpc_intervals = torch.tensor_split(hpc_iter_tensor, self.num_histogram_bins, dim=1)
                    
                    # Take sum along the time dimension for each chunk to get chunks of size -  Nchannels x 1
                    sum_hpc_intervals = [torch.sum(hpc_int,dim=1, keepdim=False) for hpc_int in hpc_intervals]
                    
                    # Concatenate the bins to get the final feature tensor
                    hpc_feature_tensor = torch.cat(sum_hpc_intervals, dim=0)
                    
                    # Adding one dimension for channel [for compatibility purpose]. N_Channel = 1 in this case.
                    reduced_batch_hpc.append(torch.unsqueeze(hpc_feature_tensor, dim=0)) 

                batch_tensor = torch.stack(reduced_batch_hpc, dim=0)
            
            else:
                # NOTE: This is not a tensor. It is a list of the iterations.
                batch_tensor = batch_hpc 

        return batch_tensor, torch.tensor(batch_labels), batch_paths

I have not attached the helper functions as they may not be necessary.

If you have a look at the custom collate_fn object under the call() function, there are two different file types. For “simpleperf” file type, the for loop iterates through all the batches. The issue is for the “dvfs” file type, where the for loop terminates early withouth going through all the batches.

Changing the number of workers did not change the result. The for loop still terminates with the same number of batches for the “dvfs” file type.

It’s not really straightforward to debug without a runnable code snippet—I would check the conditions that cause StopIteration to be raised in the dataloader and check why it is happening prematurely:
https://pytorch.org/docs/stable/_modules/torch/utils/data/dataloader.html#DataLoader

I figured out the problem. One of the parsers for reading a file was raising StopIteration which was resulting in early termination of the for loop. Thanks a lot for providing a direction!