Using multithreading with custom sampler dataloader

I’m trying to implement GQN and as for my data set,
after some transformations I’ve got 400 compressed .pt files, each including 2000 training samples.

The easiest way to apply a Dataset over it would be to use getitem able to calculate and decompress file in which given sample is stored in order to access it.
However it has huge disadvantage of having to decompress a file upon drawing any sample from Database.
I’d like avoid decompressing whole data set as it would use lots of my ROM, so I define two other approaches:

Store each of 2000 sample from file in separate compressed file - takes same amount of space and still needs to decompress on calling getitem but decompressed files are significantly smaller (still takes time to decompress).

Lastly I decide to iterate randomly over 2000 samples from one file and then move on to another random compressed file. This allows me to iterate over Dataset faster, but with my current implementation using multithreading actually slows my script down.

The code I’m using is as follows:

import torch
from torch.utils.data import Dataset, DataLoader, Sampler
import gzip
import os
from tqdm import tqdm

class ShepardMetzler(Dataset):
	def __init__(self, root_dir, transform= None):
		self.root_dir = root_dir
		self.transform = transform
		self.zipPaths = sorted([os.path.join(self.root_dir, fileName) for fileName in os.listdir(root_dir)])

		with gzip.open(self.zipPaths[0], "rb") as f:
			self.length = len(torch.load(f))*len(self.zipPaths)
			self.perZip = self.length / len(self.zipPaths)

		self.currentZip = -1 

	def __len__(self):
		return self.length

	def __getitem__(self, idx):

		if idx[0] != self.currentZip:
			self.currentZip = idx[0]
			with gzip.open(self.zipPaths[idx[0]], "rb") as f:
				self.currentTensor = torch.load(f)

		return self.currentTensor[idx[1]]

	def zipsLength(self):
		return self.perZip

	def zipsNumber(self):
		return len(self.zipPaths)


class SubSampler(Sampler):
	def __init__(self, dSet):
		self.dSet = dSet
		self.idxSplit = dSet.zipsLength()
		self.setsNumber = dSet.zipsNumber()
	
	def __len__ (self):
		return len(self.dSet)

	def __iter__(self):
		subSets = torch.randperm(self.setsNumber)
		for sSet in subSets:
			perm = torch.randperm(self.idxSplit)
			for sample in perm:
				yield [sSet, sample]
		return




dataset = ShepardMetzler(root_dir = "../shepard_metzler_5_parts/train")

mySampler = SubSampler(dataset)

dataloader = DataLoader(dataset, batch_size = 32, sampler=mySampler, num_workers = 0)



for batch in tqdm(dataloader):
	pass

The question is, how could I allow it to use multithreading?
I know DataLoader has worker_init_fn parameter but I’ve got no idea how to use it nor if it’s what I should be actually using.
Or could I somehow abuse multithreading creating DataLoader on top of samples provided by another DataLoader?
It would be great if I could get somehow each worker to work on different compressed files at same time.

I’m not sure about if I understood it.
In short you have lot of datasamples in a compressed file, thus it’s not word to decompress that for reading a single file.

What you can try to do is to generate those compressed files having (each of them) a balanced amount of samples of each class. Then you can customize the dataloader to load data from each compressed file, compressed file by compressed file.
I’m not sure if multiprocessing can help you, since it’s used to parallelize a function but you cannot accelerate (in a simple way) the decompression step in case of having a big file.

In case you use smaller files, notice the dataloader class provides multiprocessing. They also provide random sampling.

Hey and thanks for your answer.

I already have datasamples split into several compressed files and code I attached is used to do exactly what you specified.
However I wanted to speed it up as I’m able to decompress a few files at once,
so I wanted to simply specify to workers which file they should be working with.

fe at every epoch n lists of randomly chosen separate subsets of files are passed to n workers,
and each worker iterates randomly over samples from his given files, file by file.

I obviously know there is mupltiprocessing built in to DataLoader but it simply cannot work for code given above, without some additional knowledge, which I’m unable to pass.
And as I stated before I can’t afford using random sampling, cause it would force me to decompress random file upon drawing EVERY sample.