Torchdata Datapipe load multiple?

Hi,

I would like to know the best way to read a whole line of a csv, and collate all the samples into one tensor would be.

I have a csv with 12 columns, each containing a URL to an s3 bucket/ image. I need to read all 12 URLs using the s3fileloader, and collate these samples into a time series tensor (tchw).

The complicated part is some rows contain missing data and I need to pad these as a zero array and add this to the timeseries.

I have some initial code that works, but it returns a datapipe rather than an array if there is missing data.

Can you please provide a small code snippet and sample input? I am suspecting the function that is applying the padding may be off.

1 Like

Sample Input:

from torchdata.datapipes.iter import IterableWrapper
import albumentations as A
from albumentations.pytorch import ToTensorV2

import torchdata.datapipes as dp
FOLDER = "PATH TO CSV"

def row_processer(row): ## this function converts the byte stream into an array
    with MemoryFile(row[1]) as memfile:
      with memfile.open() as dataset:
          data_array = dataset.read().astype(np.float32)
    return data_array

### custom collate/ pad (NOT WORKING)
def janS2(data):
  if data[3] == "":
    return np.zeros((11,256,256))
  else:
    dp = S3FileLoader(data[3])
    dp = dp.map(row_processer)
    return dp

def janS1(data):
  if data[2] == "":
    return np.zeros((4,256,256))
  else:
    dp = S3FileLoader(data[2])
    dp = dp.map(row_processer)
    return dp
def maskgetter(data):
  return data[1]

## define helper functions
transforms = A.Compose([
  A.HorizontalFlip(p=0.5),
  #A.Rotate(limit=35, p=1.0),
  A.VerticalFlip(p=0.5),
  A.RandomRotate90(p=1),
  A.Transpose(p=0.5),
  ToTensorV2()
  ])

def normalise(data):
  sentinel = data[0]/np.max(data[0])
  return sentinel, data[1]

def apply_transform(image):
    sentinel = np.transpose(image[0], (1,2,0))
    mask = np.squeeze(image[1], axis = 0)

    transformed = transforms(image=sentinel, mask=mask)
    sentinel = transformed['image']
    mask = transformed['mask']
    mask = mask[None,:,:]
    return sentinel, mask

### SAMPLE DATAPIPE (ONLY FOR JANAURY)

def datapipe(folder, string):
  datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith(string))
  datapipe = dp.iter.FileOpener(datapipe, mode='rt')
  datapipe = datapipe.parse_csv(skip_lines = 1,delimiter=',').shuffle()
  datapipe = IterableWrapper(datapipe).sharding_filter()
  [janS1, janS2, mask] = datapipe.fork(num_instances=3)

  ## january
  janS1 = janS1.map(janS1)
  janS2 = janS2.map(janS2)
  # mask is never missing in dataset
  mask= mask.map(maskgetter)
  mask= S3FileLoader(mask)
  mask= mask.map(row_processer)
  ds = janS1.zip(janS2,mask)
  return ds