TorchData loading timeseries

Hi,

This may be a complicated question, but I am stuck so here goes!

I am trying to figure out how I can create a datapipe, that reads a csv containing s3 URLs for image chips. An example of the dataset looks as follows:

For each image chip there are 2 URLs (images) for each month, plus a mask image. The extra tricky part is there is missing data and not every chip has a images for all 12 months.

WHAT I’M TRYING TO ACHIEVE:
I am trying to use datapipes to read in these URLs, concatenate all the images for one chip into a tensor of shape (t, h, w, c), with t being the time (month) plus the mask.

I have had success, (although very messy code), in doing this where the chip has a complete set of images for every month, but when there is missing data I am not sure how to pad it out and get it to work.

if anyone has any suggestion at all I am all ears. Happy to elaborate more and share current code if needed.

Cheers,

Sam

a rudimentary bit of code:

#helper functions

def jans2getter(data):  ## if row is empty (no URL) output an empty array (padding)
  if data[3] == "":
    return np.zeros((11,256,256))
  else:                 ## if row has url, retrieve the data and convert from bytestream into array
    dp = S3FileLoader(data[3])
    dp = dp.map(row_processer)
    return dp
def jans1getter(data):
  if data[2] == "":
    return np.zeros((4,256,256))
  else:
    dp = S3FileLoader(data[2])
    dp = dp.map(row_processer)
    return dp
def maskgetter(data):
  return data[1]

def row_processer(row): ## this function converts the byte stream into an array
    with MemoryFile(row[1]) as memfile:
      with memfile.open() as dataset:
          data_array = dataset.read().astype(np.float32)
    return data_array

## training
def datapipe(folder, string):
  #open csv file
  datapipe = dp.iter.FileLister([FOLDER]).filter(filter_fn=lambda filename: filename.endswith(string))
  datapipe = dp.iter.FileOpener(datapipe, mode='rt')
  # read line of csv file (26 columns)
  datapipe = datapipe.parse_csv(skip_lines = 1,delimiter=',').shuffle()
  datapipe = IterableWrapper(datapipe).sharding_filter()

  ## sample datapipe for january only
  [januaryS1,JanuaryS2, dpmask] = datapipe.fork(num_instances=3)
  januaryS1 = januaryS1.map(jans1getter)
  JanuaryS2 = JanuaryS2.map(jans2getter)
  dpmask= dpmask.map(maskgetter)
  dpmask= S3FileLoader(dpmask)
  dpmask= dpmask.map(row_processer)
  ds = januaryS1.zip(JanuaryS2, dpmask)
  return ds

dataset = datapipe(FOLDER, "all.csv")
l,m,c = next(iter(dataset))

the objects are either a zero array (good), or an ‘MapperIterDataPipe’, of which I do not know how to extract the array from within. The mask comes out as an array too(good).