DataLoader and Postgres (or other SQL) an option?

Hi,

I’m trying to keep things in a postgres database, because - well, it’s complicated.

Is there anyone who’s done this in an efficient manner with the DataLoader and Dataset classes? I’m relatively proficient at Google-Fu, and no dice so far.

I really would prefer not to have to export from postgres to numpy arrays or csvs, but it seems that those are the best ways I can do this.

Please, someone prove me wrong if you can.

Thanks!

JQ

1 Like

Would it be possible to query your data using some kind of index? If so you could try to get the sample from your Postgres database in the __getitem__ function of your Dataset and try to convert it to a numpy array using e.g. warp_prism.
From there you could transform your numpy array to a tensor using torch.from_numpy.
I haven’t tried it and am absolutely no database expert, but it could be a potential way.
Let me know, if I’m completely missing some crucial aspects.

2 Likes

Actually… that might work. I need to think about it.

No… wait. It will work. Thanks. Brilliant. I should have thought of that myself.

2 Likes

Just thought I would share with ya. Since running so many single record queries can be a bottleneck for performance.

I found considerable speed up when querying my dataset by simply returning my indexes in the getitem function.
Then the list provided to a custom collate function can be converted to a tuple and used with a WHERE IN query which returns the batch records in a single query,

3 Likes

I ran into this issue too. I made two dataloaders. First dataloader checks length of table you are querying on __init__ and then __getitem__ spits out a large amount of row IDs from the database. Second dataloader is initialized with these indices as an __init__ parameter, grabs all of those indices from the DB and sets them to self.X. You can then batch off of that second loader to train your model.

Still interested in other solutions, but this solution gets rid of the problem of running select statements on every __getitem__ call.

ex. pseudocode:

# Indices is array of row IDs from SQL (mabye 1,000,000 at a time)
for indices in partition_loader:
    # Second loader initialized within this loop w/ those indices
    loader = data.DataLoader(local_dataset(indices, **params) **loader_params)
    for batch_x, batch_y in loader:
        fit_batch(batch_x, batch_y)
1 Like

I think that this would probably be a cleaner way to do it than the one we settled on. Possibly more efficient as well.

1 Like

So I obtained batches directly from the psql db with psycopg2 using a single dataloader and dataset.

I was using a connection pool in my instantiation of the dataset object.

I have since switched to fast5 files for my training datasets and using pandas for performance reasons but I prefer the data wrangling capabilities of SQL. Would be nice if training your model this way was more efficient.

def __getitem__(self, idx: int) -> int:
    return idx

def collate(self, results: list) -> tuple:
    query = """SELECT label, feature FROM dataset WHERE index IN %s"""
    result = None
    conn = self.conn_pool.getconn()

    try:
        conn.set_session(readonly=True)
        cursor = conn.cursor()
        cursor.execute(query, tuple(results,))
        result = cursor.fetchall()

    except Error as conn_pool_error:
        print(conn_pool_error)

    finally:
        self.conn_pool.putconn(conn)
   #format batches here
   #format batches here

    if result is not None:
        return result
    else: 
        throw Exception('problem in obtaining batches')
            

1 Like

I found it easier to not use Dataset and Dataloader at all and rely on async. Here’s a writeup (with code): https://medium.com/@jonathan.wickens/using-postgres-as-a-dataloader-with-pytorch-bba0d5cbe1fa

1 Like

@jwickens

Is there a way to wrap this in a DataLoader? PyTorch Lightning requires the use of DataLoaders.

Maybe:

class DBIterDataset (pytorch.IterableDataset):
  def __init__ (self, ait, loop):
    self.ait = ait
    self.loop = loop
  def __iter__ (self):
    return wrap_async_iter (self.ait, self.loop)