Pytorch S3FileLoader help

Hi,

I am trying to create a dataloader which streams image chips stored in an aws bucket.

I currently have a csv file which contains the download urls to:

  • a sentinel 1 image chip,
  • a matching sentinel 2 image chip,
  • the matching mask layer chip.

I have been trying create a datapipe using torch.data S3FileLister and Loader, however I am stuck!

I need to create a dataset/datapipe that streams the matching Sentinel 1 and 2 chips, merges them into one tensor, and also the subsequent mask. Currently I can stream a single image chip, (or a batch). but I do not know how to create the class that can stream multiple and perform the merging function.

CURRENT CODE

```{python}

tests2= ['s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_00.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_01.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_05.tif',
         's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S2_06.tif']

tests1 = ['s3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_00.tif',
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_01.tif', 
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_05.tif',
          's3://drivendata-competition-biomassters-public-us/train_features/0003d2eb_S1_06.tif']


def row_processer(row):
    with MemoryFile(row[1]) as memfile:
      with memfile.open() as dataset:
          data_array = dataset.read().astype(np.float32)
    return data_array

def build_datapipes(list1):
    datapipe = S3FileLister(list1)
    datapipe = S3FileLoader(datapipe)
    datapipe = datapipe.shuffle()
    datapipe = datapipe.map(row_processer)
    return datapipe

from torch.utils.data import DataLoader

if __name__ == '__main__':
    datapipe = build_datapipes(test)
    dl = DataLoader(dataset=datapipe, batch_size=2, num_workers=0)
    first = next(iter(dl))
    print(f"Labels batch shape: {first.size()}")

using Zipper I can load both, but still unsure how to concat these together now!

datapipe = S3FileLister(tests1)
datapipe = S3FileLoader(datapipe)
datapipe = datapipe.shuffle()
datapipe = datapipe.map(row_processer)

datapipe1 = S3FileLister(tests2)
datapipe1 = S3FileLoader(datapipe1)
datapipe1 = datapipe1.shuffle()
datapipe1 = datapipe1.map(row_processer)

ds = pipes.Zipper(datapipe1, datapipe).shuffle().batch(2)

batch = next(iter(ds))

for mk, dt in batch:
  print(mk.shape)
  print(dt.shape)

"(11, 256, 256)
(4, 256, 256)
(11, 256, 256)
(4, 256, 256)"

Sorry for the very messy code, I am abit out of my depth!
Ok so I have tried directly loading with the csv, but am running into this issue now.

import torchdata.datapipes as dp
FOLDER = '/content/drive/MyDrive/biomassters/training_links/'

def s2getter(data):
  return data[1]
def s1getter(data):
  return data[2]
def maskgetter(data):
  return data[0]

datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith('ed.csv'))
datapipe = dp.iter.FileOpener(datapipe, mode='rt')
datapipe = datapipe.parse_csv(skip_lines = 1,delimiter=',')
datapipe1 = datapipe.map(s2getter)
datapipe1 = S3FileLoader(datapipe1)
datapipe1= datapipe1.map(row_processer)
datapipe2 = datapipe.map(s1getter)
datapipe2 = S3FileLoader(datapipe2)
datapipe2= datapipe2.map(row_processer)
datapipe3 = datapipe.map(abgmgetter)
datapipe3 = S3FileLoader(datapipe3)
datapipe3= datapipe3.map(row_processer)

ds = pipes.Zipper(datapipe1, datapipe2).shuffle().batch(1)

batch = next(iter(ds))

for mk, dt in batch:
  l = mk
  p = dt
  print(mk.shape)
  print(dt.shape)

"RuntimeError: This iterator has been invalidated because another iterator has been createdfrom the same IterDataPipe: CSVParserIterDataPipe()
This may be caused multiple references to the same IterDataPipe. We recommend using `.fork()` if that is necessary.
For feedback regarding this single iterator per IterDataPipe constraint, feel free to comment on this issue: https://github.com/pytorch/data/issues/45.
This exception is thrown by __iter__ of CSVParserIterDataPipe()"

So after you’ve combined both the Sentinel-1 chip and Sentinel-2 chip using Zipper, you’ll need to use a custom collate function (Collator — TorchData 0.5.0 (beta) documentation) to put them together. Something like this (warning, untested code):

def sen1sen2_collate_fn(sen1_and_sen2: tuple):
    sen1_img, sen2_img = sen1_and_sen2
    img_stack = np.stack(arrays=[sen1_img, sen2_img], axis=0)

    return img_stack

dp_sen1_sen2 = dp_sen1.zip(dp_sen2)
dp_stack = dp_sen1_sen2.collate(collate_fn=sen1sen2_collate_fn)

This is roughly based on Stacking layers — zen3geo (disclaimer: I wrote that tutorial). It’s a bit different as it’s using xarray, but the concept of a custom collate function should still hold if you’re sticking with regular numpy arrays or torch.Tensor objects, just need to be careful with the dimensions.

By the way, you should be doing the shuffling of the chips near the end, after they have been combined in the right order with the mask, otherwise the Sentinel-1 + Sentinel-2 + mask chips won’t be matching. Good luck too with the competition!

1 Like

Awesome!
I ended up going with a concat in the collate function but pretty much same thing. I think my dataloader is finally ready haha!

def sen1sen2_collate_fn(sen1_and_sen2: tuple):
    sen1_img, sen2_img, mask = sen1_and_sen2
    img_stack = np.concatenate((sen1_img, sen2_img), axis = 0)
    # img_stack = np.stack(arrays=[sen1_img, sen2_img], axis=0)

    return img_stack, mask
1 Like