Pytorch S3FileLoader help

Hi,

I am trying to create a dataloader which streams image chips stored in an aws bucket.

I currently have a csv file which contains the download urls to:

  • a sentinel 1 image chip,
  • a matching sentinel 2 image chip,
  • the matching mask layer chip.

I have been trying create a datapipe using torch.data S3FileLister and Loader, however I am stuck!

I need to create a dataset/datapipe that streams the matching Sentinel 1 and 2 chips, merges them into one tensor, and also the subsequent mask. Currently I can stream a single image chip, (or a batch). but I do not know how to create the class that can stream multiple and perform the merging function.

CURRENT CODE

```{python}

tests2= ['s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_00.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_01.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_05.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_06.tif']

tests1 = ['s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif',
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_01.tif', 
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_05.tif',
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_06.tif']


def row_processer(row):
    with MemoryFile(row[1]) as memfile:
      with memfile.open() as dataset:
          data_array = dataset.read().astype(np.float32)
    return data_array

def build_datapipes(list1):
    datapipe = S3FileLister(list1)
    datapipe = S3FileLoader(datapipe)
    datapipe = datapipe.shuffle()
    datapipe = datapipe.map(row_processer)
    return datapipe

from torch.utils.data import DataLoader

if __name__ == '__main__':
    datapipe = build_datapipes(test)
    dl = DataLoader(dataset=datapipe, batch_size=2, num_workers=0)
    first = next(iter(dl))
    print(f"Labels batch shape: {first.size()}")
1 Like

using Zipper I can load both, but still unsure how to concat these together now!

datapipe = S3FileLister(tests1)
datapipe = S3FileLoader(datapipe)
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processer)

datapipe1 = S3FileLister(tests2)
datapipe1 = S3FileLoader(datapipe1)
datapipe1 = datapipe1.shuffle()
datapipe1 = datapipe1.map(row_processer)

ds = pipes.Zipper(datapipe1, datapipe).shuffle().batch(2)

batch = next(iter(ds))

for mk, dt in batch:
  print(mk.shape)
  print(dt.shape)

"(11, 256, 256)
(4, 256, 256)
(11, 256, 256)
(4, 256, 256)"

Sorry for the very messy code, I am abit out of my depth!
Ok so I have tried directly loading with the csv, but am running into this issue now.

import torchdata.datapipes as dp
FOLDER = '/content/drive/MyDrive/biomassters/training_links/'

def s2getter(data):
  return data[1]
def s1getter(data):
  return data[2]
def maskgetter(data):
  return data[0]

datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('ed.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(skip_lines = 1,delimiter=',')
datapipe1 = datapipe.map(s2getter)
datapipe1 = S3FileLoader(datapipe1)
datapipe1= datapipe1.map(row_processer)
datapipe2 = datapipe.map(s1getter)
datapipe2 = S3FileLoader(datapipe2)
datapipe2= datapipe2.map(row_processer)
datapipe3 = datapipe.map(abgmgetter)
datapipe3 = S3FileLoader(datapipe3)
datapipe3= datapipe3.map(row_processer)

ds = pipes.Zipper(datapipe1, datapipe2).shuffle().batch(1)

batch = next(iter(ds))

for mk, dt in batch:
  l = mk
  p = dt
  print(mk.shape)
  print(dt.shape)

"RuntimeError: This iterator has been invalidated because another iterator has been createdfrom the same IterDataPipe: CSVParserIterDataPipe()
This may be caused multiple references to the same IterDataPipe. We recommend using `.fork()` if that is necessary.
For feedback regarding this single iterator per IterDataPipe constraint, feel free to comment on this issue: https://github.com/pytorch/data/issues/45.
This exception is thrown by __iter__ of CSVParserIterDataPipe()"

So after you’ve combined both the Sentinel-1 chip and Sentinel-2 chip using Zipper, you’ll need to use a custom collate function (Collator — TorchData 0.5.0 (beta) documentation) to put them together. Something like this (warning, untested code):

def sen1sen2_collate_fn(sen1_and_sen2: tuple):
    sen1_img, sen2_img = sen1_and_sen2
    img_stack = np.stack(arrays=[sen1_img, sen2_img], axis=0)

    return img_stack

dp_sen1_sen2 = dp_sen1.zip(dp_sen2)
dp_stack = dp_sen1_sen2.collate(collate_fn=sen1sen2_collate_fn)

This is roughly based on Stacking layers — zen3geo (disclaimer: I wrote that tutorial). It’s a bit different as it’s using xarray, but the concept of a custom collate function should still hold if you’re sticking with regular numpy arrays or torch.Tensor objects, just need to be careful with the dimensions.

By the way, you should be doing the shuffling of the chips near the end, after they have been combined in the right order with the mask, otherwise the Sentinel-1 + Sentinel-2 + mask chips won’t be matching. Good luck too with the competition!

1 Like

Awesome!
I ended up going with a concat in the collate function but pretty much same thing. I think my dataloader is finally ready haha!

def sen1sen2_collate_fn(sen1_and_sen2: tuple):
    sen1_img, sen2_img, mask = sen1_and_sen2
    img_stack = np.concatenate((sen1_img, sen2_img), axis = 0)
    # img_stack = np.stack(arrays=[sen1_img, sen2_img], axis=0)

    return img_stack, mask
2 Likes

Interesting conversation @Samuel_Lewis and @weiji14. I am trying to stream the same data using pytorch datapipes (starting with your code above), but i get the error “urlCode: 77, Problem with the SSL CA cert (path? access rights?) This exception is thrown by iter of S3FileListerIterDataPipe().”

I believe it is some silly mistake i am making or forgetting to do something.

I am currently using a kaggle notebook. I have tried using colab notebook as well but get the same error. Am I supposed to configure the notebook or get an aws account or something? Thanks in advance!

Are you able to configure your environment variables (as discussed here)?

Yes, @nivek, I am able to input my AWS Access Key ID, AWS Secret Access Key, Default region name, and Default output format without any errors using aws configure in the cli. I am also able to configure my credentials in the kaggle notebook by following instructions from here, but the error still remains.