I’m doing timeseries classification and i use zarr as my data storage. I have written an iterable dataset to access zarr, which works with my small test-dataset but behaves weirdly once i move to my actual (much larger) dataset. The dataloader no longer works when i use multiple workers (it hangs after the first batch) and if i use only a single thread, performance starts out okish but gradually decreases until everything grinds to a halt (and uses excessive amounts of RAM, looks like a memory leak type of issue).
I’ve added the code for my dataset, worker_init_fn, and training function below. If anybody has any ideas about what i’m doing wrong, it would be greatly appreciated. ATM i don’t even have an idea how to debug this properly. I tried iterating over the dataset in a separate script like i do in the dataset, which works perfectly fine.
class Data(IterableDataset):
def __init__(self, path, start=None, end=None):
super(Data, self).__init__()
store = zarr.DirectoryStore(path)
self.array = zarr.open(store, mode='r')
if start is None:
start = 0
if end is None:
end = self.array.shape[0]
assert end > start
self.start = start
self.end = end
def __iter__(self):
return islice(self.array, self.start, self.end)
def worker_init_fn(worker_id):
worker_info = torch.utils.data.get_worker_info()
dataset = worker_info.dataset # the dataset copy in this worker process
overall_start = dataset.start
overall_end = dataset.end
# configure the dataset to only process the split workload
per_worker = int(
math.ceil(
(overall_end - overall_start) / float(worker_info.num_workers)
)
)
worker_id = worker_info.id
dataset.start = overall_start + worker_id * per_worker
dataset.end = min(dataset.start + per_worker, overall_end)
def train(self):
use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
sxnet = SXNet()
sxnet.to(device)
optimizer = optim.Adam(
sxnet.parameters(),
lr=self.learning_rate
)
criterion = nn.BCEWithLogitsLoss()
traindata = Data(os.path.join(self.store, 'trainset1'))
trainloader = DataLoader(
dataset=traindata,
batch_size=50000,
shuffle=False,
num_workers=os.cpu_count(),
pin_memory=True,
worker_init_fn=worker_init_fn
)
params = {
'desc': 'epoch progress',
'smoothing': 0.01,
'total': math.ceil(
traindata.array.shape[0] / trainloader.batch_size)
}
for e in range(self.epochs): # loop over the dataset multiple times
epoch_loss = 0
epoch_acc = 0
epoch_len = 0
for batch in tqdm(trainloader, **params):
x = batch[:, 1:].to(device)
y = batch[:, 0].unsqueeze(0).T.to(device)
optimizer.zero_grad()
# forward + backward + optimize
y_pred = sxnet(x)
loss = criterion(y_pred, y)
acc = self.binary_acc(y_pred, y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
epoch_acc += acc.item()
epoch_len += y.shape[1]
print(f'Epoch {e+0:03}: | '
f'Loss: {epoch_loss/epoch_len:.5f} | '
f'Acc: {epoch_acc/epoch_len:.3f}')
torch.save(sxnet.state_dict(), self.model_store)