Iterable dataset with zarr as storage backend results in degrading performance

I’m doing timeseries classification and i use zarr as my data storage. I have written an iterable dataset to access zarr, which works with my small test-dataset but behaves weirdly once i move to my actual (much larger) dataset. The dataloader no longer works when i use multiple workers (it hangs after the first batch) and if i use only a single thread, performance starts out okish but gradually decreases until everything grinds to a halt (and uses excessive amounts of RAM, looks like a memory leak type of issue).

I’ve added the code for my dataset, worker_init_fn, and training function below. If anybody has any ideas about what i’m doing wrong, it would be greatly appreciated. ATM i don’t even have an idea how to debug this properly. I tried iterating over the dataset in a separate script like i do in the dataset, which works perfectly fine.

class Data(IterableDataset):
    def __init__(self, path, start=None, end=None):
        super(Data, self).__init__()
        store = zarr.DirectoryStore(path)
        self.array = zarr.open(store, mode='r')

        if start is None:
            start = 0
        if end is None:
            end = self.array.shape[0]

        assert end > start

        self.start = start
        self.end = end

    def __iter__(self):
        return islice(self.array, self.start, self.end)
def worker_init_fn(worker_id):
    worker_info = torch.utils.data.get_worker_info()
    dataset = worker_info.dataset  # the dataset copy in this worker process
    overall_start = dataset.start
    overall_end = dataset.end
    # configure the dataset to only process the split workload
    per_worker = int(
        math.ceil(
            (overall_end - overall_start) / float(worker_info.num_workers)
        )
    )
    worker_id = worker_info.id
    dataset.start = overall_start + worker_id * per_worker
    dataset.end = min(dataset.start + per_worker, overall_end)
def train(self):
        use_cuda = torch.cuda.is_available()
        device = torch.device("cuda:0" if use_cuda else "cpu")

        sxnet = SXNet()
        sxnet.to(device)
        optimizer = optim.Adam(
            sxnet.parameters(),
            lr=self.learning_rate
        )

        criterion = nn.BCEWithLogitsLoss()

        traindata = Data(os.path.join(self.store, 'trainset1'))
        trainloader = DataLoader(
            dataset=traindata,
            batch_size=50000,
            shuffle=False,
            num_workers=os.cpu_count(),
            pin_memory=True,
            worker_init_fn=worker_init_fn
        )

        params = {
            'desc': 'epoch progress',
            'smoothing': 0.01,
            'total': math.ceil(
                traindata.array.shape[0] / trainloader.batch_size)
        }

        for e in range(self.epochs):  # loop over the dataset multiple times
            epoch_loss = 0
            epoch_acc = 0
            epoch_len = 0
            for batch in tqdm(trainloader, **params):
                x = batch[:, 1:].to(device)
                y = batch[:, 0].unsqueeze(0).T.to(device)

                optimizer.zero_grad()

                # forward + backward + optimize
                y_pred = sxnet(x)
                loss = criterion(y_pred, y)
                acc = self.binary_acc(y_pred, y)

                loss.backward()
                optimizer.step()

                epoch_loss += loss.item()
                epoch_acc += acc.item()
                epoch_len += y.shape[1]

            print(f'Epoch {e+0:03}: | '
                  f'Loss: {epoch_loss/epoch_len:.5f} | '
                  f'Acc: {epoch_acc/epoch_len:.3f}')

            torch.save(sxnet.state_dict(), self.model_store)

Since I’ve returned working on this project, I was able to find the memory leak issue. The problem was in my validation loop:

with torch.no_grad():
    for batch in tqdm(testloader, **params):
        x = batch[:, 1:].to(device)
        y = batch[:, 0].unsqueeze(0).T
        y_test_pred = torch.sigmoid(sxnet(x))
        y_pred_tag = torch.round(y_test_pred)
        y_pred_list.append(y_pred_tag.cpu().numpy())
        y_list.append(y.numpy())

Saving the results like this used way more memory than I originally thought. Since this is a binary classification task, my results are 1xn vectors, so I was able to use .flatten() on the numpy arrays before appending, which has solved the excessive RAM use issue.

Unfortunately I’m still facing the problem that if I use num_workers > 1 for my dataloader, it just hangs on doing the second batch. CPU usage is still there but it never finishes the second loop iteration. I have a second dataset that I created by removing the class imbalance from my original dataset, and on that data it works like a charm. From the dataloader’s perspective, the only difference between the datasets should be the number of rows.

I went ahead and compared the .zarray files of my two datasets to be sure that there wasn’t some data type nonsense going on.

{
    "chunks": [
        50000,
        290
    ],
    "compressor": {
        "blocksize": 0,
        "clevel": 5,
        "cname": "lz4",
        "id": "blosc",
        "shuffle": 1
    },
    "dtype": "<f4",
    "fill_value": 0.0,
    "filters": null,
    "order": "C",
    "shape": [
        4702488,
        290
    ],
    "zarr_format": 2
}

{
    "chunks": [
        50000,
        290
    ],
    "compressor": {
        "blocksize": 0,
        "clevel": 5,
        "cname": "lz4",
        "id": "blosc",
        "shuffle": 1
    },
    "dtype": "<f4",
    "fill_value": 0.0,
    "filters": null,
    "order": "C",
    "shape": [
        477285033,
        290
    ],
    "zarr_format": 2

Everything looks identical except for the number of rows.

This is weird, I’ve now tested the dataloader in a test function like outlined in the documentation for the iterable dataset with my data:

def test_dataloader(self):
        traindata = Data(
            os.path.join(self.store, 'origin'),
            0,
            200000
        )

        trainloader = DataLoader(
            dataset=traindata,
            batch_size=50000,
            shuffle=False,
            num_workers=os.cpu_count(),
            worker_init_fn=worker_init_fn
        )

        print(list(trainloader))

This works perfectly. Inside the training function, it still hangs.

Oof, I’m a moron :see_no_evil:

Of course each worker process has to open the dataset once, which takes quite a while for a 300gb+ zarr store. Once all the workers have the store opened, everything works quite quickly. So for the first os.cpu_count() number of batches, i’m seeing terrible performance, but afterwards it works fine.

I was once again a bit hasty in my conclusion. The actual issue is in that I did not fully understand how islice works. islice takes the underlying generator and iterates it to the start index you give it without returning anything. Only after it has done that will it start actually yielding elements of the array. Naturally for large arrays this is extremely inefficient, hence the long wait times I experienced at startup. The solution was to modify the storage backend (zarr) to enable its built-in iteration to start at a custom index. I’ve submitted a pull request that should get released with the next minor release of zarr.

2 Likes